- Add debounced state updates for title and content (500ms delay) - Immediate UI updates with delayed history saving - Prevent one-letter-per-undo issue - Add cleanup for debounce timers on unmount
1086 lines
49 KiB
JavaScript
1086 lines
49 KiB
JavaScript
import { safeParse } from '../server/zod-compat.js';
|
|
import { CancelledNotificationSchema, CreateTaskResultSchema, ErrorCode, GetTaskRequestSchema, GetTaskResultSchema, GetTaskPayloadRequestSchema, ListTasksRequestSchema, ListTasksResultSchema, CancelTaskRequestSchema, CancelTaskResultSchema, isJSONRPCErrorResponse, isJSONRPCRequest, isJSONRPCResultResponse, isJSONRPCNotification, McpError, PingRequestSchema, ProgressNotificationSchema, RELATED_TASK_META_KEY, TaskStatusNotificationSchema, isTaskAugmentedRequestParams } from '../types.js';
|
|
import { isTerminal } from '../experimental/tasks/interfaces.js';
|
|
import { getMethodLiteral, parseWithCompat } from '../server/zod-json-schema-compat.js';
|
|
/**
|
|
* The default request timeout, in miliseconds.
|
|
*/
|
|
export const DEFAULT_REQUEST_TIMEOUT_MSEC = 60000;
|
|
/**
|
|
* Implements MCP protocol framing on top of a pluggable transport, including
|
|
* features like request/response linking, notifications, and progress.
|
|
*/
|
|
export class Protocol {
|
|
constructor(_options) {
|
|
this._options = _options;
|
|
this._requestMessageId = 0;
|
|
this._requestHandlers = new Map();
|
|
this._requestHandlerAbortControllers = new Map();
|
|
this._notificationHandlers = new Map();
|
|
this._responseHandlers = new Map();
|
|
this._progressHandlers = new Map();
|
|
this._timeoutInfo = new Map();
|
|
this._pendingDebouncedNotifications = new Set();
|
|
// Maps task IDs to progress tokens to keep handlers alive after CreateTaskResult
|
|
this._taskProgressTokens = new Map();
|
|
this._requestResolvers = new Map();
|
|
this.setNotificationHandler(CancelledNotificationSchema, notification => {
|
|
this._oncancel(notification);
|
|
});
|
|
this.setNotificationHandler(ProgressNotificationSchema, notification => {
|
|
this._onprogress(notification);
|
|
});
|
|
this.setRequestHandler(PingRequestSchema,
|
|
// Automatic pong by default.
|
|
_request => ({}));
|
|
// Install task handlers if TaskStore is provided
|
|
this._taskStore = _options?.taskStore;
|
|
this._taskMessageQueue = _options?.taskMessageQueue;
|
|
if (this._taskStore) {
|
|
this.setRequestHandler(GetTaskRequestSchema, async (request, extra) => {
|
|
const task = await this._taskStore.getTask(request.params.taskId, extra.sessionId);
|
|
if (!task) {
|
|
throw new McpError(ErrorCode.InvalidParams, 'Failed to retrieve task: Task not found');
|
|
}
|
|
// Per spec: tasks/get responses SHALL NOT include related-task metadata
|
|
// as the taskId parameter is the source of truth
|
|
// @ts-expect-error SendResultT cannot contain GetTaskResult, but we include it in our derived types everywhere else
|
|
return {
|
|
...task
|
|
};
|
|
});
|
|
this.setRequestHandler(GetTaskPayloadRequestSchema, async (request, extra) => {
|
|
const handleTaskResult = async () => {
|
|
const taskId = request.params.taskId;
|
|
// Deliver queued messages
|
|
if (this._taskMessageQueue) {
|
|
let queuedMessage;
|
|
while ((queuedMessage = await this._taskMessageQueue.dequeue(taskId, extra.sessionId))) {
|
|
// Handle response and error messages by routing them to the appropriate resolver
|
|
if (queuedMessage.type === 'response' || queuedMessage.type === 'error') {
|
|
const message = queuedMessage.message;
|
|
const requestId = message.id;
|
|
// Lookup resolver in _requestResolvers map
|
|
const resolver = this._requestResolvers.get(requestId);
|
|
if (resolver) {
|
|
// Remove resolver from map after invocation
|
|
this._requestResolvers.delete(requestId);
|
|
// Invoke resolver with response or error
|
|
if (queuedMessage.type === 'response') {
|
|
resolver(message);
|
|
}
|
|
else {
|
|
// Convert JSONRPCError to McpError
|
|
const errorMessage = message;
|
|
const error = new McpError(errorMessage.error.code, errorMessage.error.message, errorMessage.error.data);
|
|
resolver(error);
|
|
}
|
|
}
|
|
else {
|
|
// Handle missing resolver gracefully with error logging
|
|
const messageType = queuedMessage.type === 'response' ? 'Response' : 'Error';
|
|
this._onerror(new Error(`${messageType} handler missing for request ${requestId}`));
|
|
}
|
|
// Continue to next message
|
|
continue;
|
|
}
|
|
// Send the message on the response stream by passing the relatedRequestId
|
|
// This tells the transport to write the message to the tasks/result response stream
|
|
await this._transport?.send(queuedMessage.message, { relatedRequestId: extra.requestId });
|
|
}
|
|
}
|
|
// Now check task status
|
|
const task = await this._taskStore.getTask(taskId, extra.sessionId);
|
|
if (!task) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Task not found: ${taskId}`);
|
|
}
|
|
// Block if task is not terminal (we've already delivered all queued messages above)
|
|
if (!isTerminal(task.status)) {
|
|
// Wait for status change or new messages
|
|
await this._waitForTaskUpdate(taskId, extra.signal);
|
|
// After waking up, recursively call to deliver any new messages or result
|
|
return await handleTaskResult();
|
|
}
|
|
// If task is terminal, return the result
|
|
if (isTerminal(task.status)) {
|
|
const result = await this._taskStore.getTaskResult(taskId, extra.sessionId);
|
|
this._clearTaskQueue(taskId);
|
|
return {
|
|
...result,
|
|
_meta: {
|
|
...result._meta,
|
|
[RELATED_TASK_META_KEY]: {
|
|
taskId: taskId
|
|
}
|
|
}
|
|
};
|
|
}
|
|
return await handleTaskResult();
|
|
};
|
|
return await handleTaskResult();
|
|
});
|
|
this.setRequestHandler(ListTasksRequestSchema, async (request, extra) => {
|
|
try {
|
|
const { tasks, nextCursor } = await this._taskStore.listTasks(request.params?.cursor, extra.sessionId);
|
|
// @ts-expect-error SendResultT cannot contain ListTasksResult, but we include it in our derived types everywhere else
|
|
return {
|
|
tasks,
|
|
nextCursor,
|
|
_meta: {}
|
|
};
|
|
}
|
|
catch (error) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Failed to list tasks: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
});
|
|
this.setRequestHandler(CancelTaskRequestSchema, async (request, extra) => {
|
|
try {
|
|
// Get the current task to check if it's in a terminal state, in case the implementation is not atomic
|
|
const task = await this._taskStore.getTask(request.params.taskId, extra.sessionId);
|
|
if (!task) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Task not found: ${request.params.taskId}`);
|
|
}
|
|
// Reject cancellation of terminal tasks
|
|
if (isTerminal(task.status)) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Cannot cancel task in terminal status: ${task.status}`);
|
|
}
|
|
await this._taskStore.updateTaskStatus(request.params.taskId, 'cancelled', 'Client cancelled task execution.', extra.sessionId);
|
|
this._clearTaskQueue(request.params.taskId);
|
|
const cancelledTask = await this._taskStore.getTask(request.params.taskId, extra.sessionId);
|
|
if (!cancelledTask) {
|
|
// Task was deleted during cancellation (e.g., cleanup happened)
|
|
throw new McpError(ErrorCode.InvalidParams, `Task not found after cancellation: ${request.params.taskId}`);
|
|
}
|
|
return {
|
|
_meta: {},
|
|
...cancelledTask
|
|
};
|
|
}
|
|
catch (error) {
|
|
// Re-throw McpError as-is
|
|
if (error instanceof McpError) {
|
|
throw error;
|
|
}
|
|
throw new McpError(ErrorCode.InvalidRequest, `Failed to cancel task: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
async _oncancel(notification) {
|
|
if (!notification.params.requestId) {
|
|
return;
|
|
}
|
|
// Handle request cancellation
|
|
const controller = this._requestHandlerAbortControllers.get(notification.params.requestId);
|
|
controller?.abort(notification.params.reason);
|
|
}
|
|
_setupTimeout(messageId, timeout, maxTotalTimeout, onTimeout, resetTimeoutOnProgress = false) {
|
|
this._timeoutInfo.set(messageId, {
|
|
timeoutId: setTimeout(onTimeout, timeout),
|
|
startTime: Date.now(),
|
|
timeout,
|
|
maxTotalTimeout,
|
|
resetTimeoutOnProgress,
|
|
onTimeout
|
|
});
|
|
}
|
|
_resetTimeout(messageId) {
|
|
const info = this._timeoutInfo.get(messageId);
|
|
if (!info)
|
|
return false;
|
|
const totalElapsed = Date.now() - info.startTime;
|
|
if (info.maxTotalTimeout && totalElapsed >= info.maxTotalTimeout) {
|
|
this._timeoutInfo.delete(messageId);
|
|
throw McpError.fromError(ErrorCode.RequestTimeout, 'Maximum total timeout exceeded', {
|
|
maxTotalTimeout: info.maxTotalTimeout,
|
|
totalElapsed
|
|
});
|
|
}
|
|
clearTimeout(info.timeoutId);
|
|
info.timeoutId = setTimeout(info.onTimeout, info.timeout);
|
|
return true;
|
|
}
|
|
_cleanupTimeout(messageId) {
|
|
const info = this._timeoutInfo.get(messageId);
|
|
if (info) {
|
|
clearTimeout(info.timeoutId);
|
|
this._timeoutInfo.delete(messageId);
|
|
}
|
|
}
|
|
/**
|
|
* Attaches to the given transport, starts it, and starts listening for messages.
|
|
*
|
|
* The Protocol object assumes ownership of the Transport, replacing any callbacks that have already been set, and expects that it is the only user of the Transport instance going forward.
|
|
*/
|
|
async connect(transport) {
|
|
this._transport = transport;
|
|
const _onclose = this.transport?.onclose;
|
|
this._transport.onclose = () => {
|
|
_onclose?.();
|
|
this._onclose();
|
|
};
|
|
const _onerror = this.transport?.onerror;
|
|
this._transport.onerror = (error) => {
|
|
_onerror?.(error);
|
|
this._onerror(error);
|
|
};
|
|
const _onmessage = this._transport?.onmessage;
|
|
this._transport.onmessage = (message, extra) => {
|
|
_onmessage?.(message, extra);
|
|
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
|
|
this._onresponse(message);
|
|
}
|
|
else if (isJSONRPCRequest(message)) {
|
|
this._onrequest(message, extra);
|
|
}
|
|
else if (isJSONRPCNotification(message)) {
|
|
this._onnotification(message);
|
|
}
|
|
else {
|
|
this._onerror(new Error(`Unknown message type: ${JSON.stringify(message)}`));
|
|
}
|
|
};
|
|
await this._transport.start();
|
|
}
|
|
_onclose() {
|
|
const responseHandlers = this._responseHandlers;
|
|
this._responseHandlers = new Map();
|
|
this._progressHandlers.clear();
|
|
this._taskProgressTokens.clear();
|
|
this._pendingDebouncedNotifications.clear();
|
|
const error = McpError.fromError(ErrorCode.ConnectionClosed, 'Connection closed');
|
|
this._transport = undefined;
|
|
this.onclose?.();
|
|
for (const handler of responseHandlers.values()) {
|
|
handler(error);
|
|
}
|
|
}
|
|
_onerror(error) {
|
|
this.onerror?.(error);
|
|
}
|
|
_onnotification(notification) {
|
|
const handler = this._notificationHandlers.get(notification.method) ?? this.fallbackNotificationHandler;
|
|
// Ignore notifications not being subscribed to.
|
|
if (handler === undefined) {
|
|
return;
|
|
}
|
|
// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
|
|
Promise.resolve()
|
|
.then(() => handler(notification))
|
|
.catch(error => this._onerror(new Error(`Uncaught error in notification handler: ${error}`)));
|
|
}
|
|
_onrequest(request, extra) {
|
|
const handler = this._requestHandlers.get(request.method) ?? this.fallbackRequestHandler;
|
|
// Capture the current transport at request time to ensure responses go to the correct client
|
|
const capturedTransport = this._transport;
|
|
// Extract taskId from request metadata if present (needed early for method not found case)
|
|
const relatedTaskId = request.params?._meta?.[RELATED_TASK_META_KEY]?.taskId;
|
|
if (handler === undefined) {
|
|
const errorResponse = {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: {
|
|
code: ErrorCode.MethodNotFound,
|
|
message: 'Method not found'
|
|
}
|
|
};
|
|
// Queue or send the error response based on whether this is a task-related request
|
|
if (relatedTaskId && this._taskMessageQueue) {
|
|
this._enqueueTaskMessage(relatedTaskId, {
|
|
type: 'error',
|
|
message: errorResponse,
|
|
timestamp: Date.now()
|
|
}, capturedTransport?.sessionId).catch(error => this._onerror(new Error(`Failed to enqueue error response: ${error}`)));
|
|
}
|
|
else {
|
|
capturedTransport
|
|
?.send(errorResponse)
|
|
.catch(error => this._onerror(new Error(`Failed to send an error response: ${error}`)));
|
|
}
|
|
return;
|
|
}
|
|
const abortController = new AbortController();
|
|
this._requestHandlerAbortControllers.set(request.id, abortController);
|
|
const taskCreationParams = isTaskAugmentedRequestParams(request.params) ? request.params.task : undefined;
|
|
const taskStore = this._taskStore ? this.requestTaskStore(request, capturedTransport?.sessionId) : undefined;
|
|
const fullExtra = {
|
|
signal: abortController.signal,
|
|
sessionId: capturedTransport?.sessionId,
|
|
_meta: request.params?._meta,
|
|
sendNotification: async (notification) => {
|
|
// Include related-task metadata if this request is part of a task
|
|
const notificationOptions = { relatedRequestId: request.id };
|
|
if (relatedTaskId) {
|
|
notificationOptions.relatedTask = { taskId: relatedTaskId };
|
|
}
|
|
await this.notification(notification, notificationOptions);
|
|
},
|
|
sendRequest: async (r, resultSchema, options) => {
|
|
// Include related-task metadata if this request is part of a task
|
|
const requestOptions = { ...options, relatedRequestId: request.id };
|
|
if (relatedTaskId && !requestOptions.relatedTask) {
|
|
requestOptions.relatedTask = { taskId: relatedTaskId };
|
|
}
|
|
// Set task status to input_required when sending a request within a task context
|
|
// Use the taskId from options (explicit) or fall back to relatedTaskId (inherited)
|
|
const effectiveTaskId = requestOptions.relatedTask?.taskId ?? relatedTaskId;
|
|
if (effectiveTaskId && taskStore) {
|
|
await taskStore.updateTaskStatus(effectiveTaskId, 'input_required');
|
|
}
|
|
return await this.request(r, resultSchema, requestOptions);
|
|
},
|
|
authInfo: extra?.authInfo,
|
|
requestId: request.id,
|
|
requestInfo: extra?.requestInfo,
|
|
taskId: relatedTaskId,
|
|
taskStore: taskStore,
|
|
taskRequestedTtl: taskCreationParams?.ttl,
|
|
closeSSEStream: extra?.closeSSEStream,
|
|
closeStandaloneSSEStream: extra?.closeStandaloneSSEStream
|
|
};
|
|
// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
|
|
Promise.resolve()
|
|
.then(() => {
|
|
// If this request asked for task creation, check capability first
|
|
if (taskCreationParams) {
|
|
// Check if the request method supports task creation
|
|
this.assertTaskHandlerCapability(request.method);
|
|
}
|
|
})
|
|
.then(() => handler(request, fullExtra))
|
|
.then(async (result) => {
|
|
if (abortController.signal.aborted) {
|
|
// Request was cancelled
|
|
return;
|
|
}
|
|
const response = {
|
|
result,
|
|
jsonrpc: '2.0',
|
|
id: request.id
|
|
};
|
|
// Queue or send the response based on whether this is a task-related request
|
|
if (relatedTaskId && this._taskMessageQueue) {
|
|
await this._enqueueTaskMessage(relatedTaskId, {
|
|
type: 'response',
|
|
message: response,
|
|
timestamp: Date.now()
|
|
}, capturedTransport?.sessionId);
|
|
}
|
|
else {
|
|
await capturedTransport?.send(response);
|
|
}
|
|
}, async (error) => {
|
|
if (abortController.signal.aborted) {
|
|
// Request was cancelled
|
|
return;
|
|
}
|
|
const errorResponse = {
|
|
jsonrpc: '2.0',
|
|
id: request.id,
|
|
error: {
|
|
code: Number.isSafeInteger(error['code']) ? error['code'] : ErrorCode.InternalError,
|
|
message: error.message ?? 'Internal error',
|
|
...(error['data'] !== undefined && { data: error['data'] })
|
|
}
|
|
};
|
|
// Queue or send the error response based on whether this is a task-related request
|
|
if (relatedTaskId && this._taskMessageQueue) {
|
|
await this._enqueueTaskMessage(relatedTaskId, {
|
|
type: 'error',
|
|
message: errorResponse,
|
|
timestamp: Date.now()
|
|
}, capturedTransport?.sessionId);
|
|
}
|
|
else {
|
|
await capturedTransport?.send(errorResponse);
|
|
}
|
|
})
|
|
.catch(error => this._onerror(new Error(`Failed to send response: ${error}`)))
|
|
.finally(() => {
|
|
this._requestHandlerAbortControllers.delete(request.id);
|
|
});
|
|
}
|
|
_onprogress(notification) {
|
|
const { progressToken, ...params } = notification.params;
|
|
const messageId = Number(progressToken);
|
|
const handler = this._progressHandlers.get(messageId);
|
|
if (!handler) {
|
|
this._onerror(new Error(`Received a progress notification for an unknown token: ${JSON.stringify(notification)}`));
|
|
return;
|
|
}
|
|
const responseHandler = this._responseHandlers.get(messageId);
|
|
const timeoutInfo = this._timeoutInfo.get(messageId);
|
|
if (timeoutInfo && responseHandler && timeoutInfo.resetTimeoutOnProgress) {
|
|
try {
|
|
this._resetTimeout(messageId);
|
|
}
|
|
catch (error) {
|
|
// Clean up if maxTotalTimeout was exceeded
|
|
this._responseHandlers.delete(messageId);
|
|
this._progressHandlers.delete(messageId);
|
|
this._cleanupTimeout(messageId);
|
|
responseHandler(error);
|
|
return;
|
|
}
|
|
}
|
|
handler(params);
|
|
}
|
|
_onresponse(response) {
|
|
const messageId = Number(response.id);
|
|
// Check if this is a response to a queued request
|
|
const resolver = this._requestResolvers.get(messageId);
|
|
if (resolver) {
|
|
this._requestResolvers.delete(messageId);
|
|
if (isJSONRPCResultResponse(response)) {
|
|
resolver(response);
|
|
}
|
|
else {
|
|
const error = new McpError(response.error.code, response.error.message, response.error.data);
|
|
resolver(error);
|
|
}
|
|
return;
|
|
}
|
|
const handler = this._responseHandlers.get(messageId);
|
|
if (handler === undefined) {
|
|
this._onerror(new Error(`Received a response for an unknown message ID: ${JSON.stringify(response)}`));
|
|
return;
|
|
}
|
|
this._responseHandlers.delete(messageId);
|
|
this._cleanupTimeout(messageId);
|
|
// Keep progress handler alive for CreateTaskResult responses
|
|
let isTaskResponse = false;
|
|
if (isJSONRPCResultResponse(response) && response.result && typeof response.result === 'object') {
|
|
const result = response.result;
|
|
if (result.task && typeof result.task === 'object') {
|
|
const task = result.task;
|
|
if (typeof task.taskId === 'string') {
|
|
isTaskResponse = true;
|
|
this._taskProgressTokens.set(task.taskId, messageId);
|
|
}
|
|
}
|
|
}
|
|
if (!isTaskResponse) {
|
|
this._progressHandlers.delete(messageId);
|
|
}
|
|
if (isJSONRPCResultResponse(response)) {
|
|
handler(response);
|
|
}
|
|
else {
|
|
const error = McpError.fromError(response.error.code, response.error.message, response.error.data);
|
|
handler(error);
|
|
}
|
|
}
|
|
get transport() {
|
|
return this._transport;
|
|
}
|
|
/**
|
|
* Closes the connection.
|
|
*/
|
|
async close() {
|
|
await this._transport?.close();
|
|
}
|
|
/**
|
|
* Sends a request and returns an AsyncGenerator that yields response messages.
|
|
* The generator is guaranteed to end with either a 'result' or 'error' message.
|
|
*
|
|
* @example
|
|
* ```typescript
|
|
* const stream = protocol.requestStream(request, resultSchema, options);
|
|
* for await (const message of stream) {
|
|
* switch (message.type) {
|
|
* case 'taskCreated':
|
|
* console.log('Task created:', message.task.taskId);
|
|
* break;
|
|
* case 'taskStatus':
|
|
* console.log('Task status:', message.task.status);
|
|
* break;
|
|
* case 'result':
|
|
* console.log('Final result:', message.result);
|
|
* break;
|
|
* case 'error':
|
|
* console.error('Error:', message.error);
|
|
* break;
|
|
* }
|
|
* }
|
|
* ```
|
|
*
|
|
* @experimental Use `client.experimental.tasks.requestStream()` to access this method.
|
|
*/
|
|
async *requestStream(request, resultSchema, options) {
|
|
const { task } = options ?? {};
|
|
// For non-task requests, just yield the result
|
|
if (!task) {
|
|
try {
|
|
const result = await this.request(request, resultSchema, options);
|
|
yield { type: 'result', result };
|
|
}
|
|
catch (error) {
|
|
yield {
|
|
type: 'error',
|
|
error: error instanceof McpError ? error : new McpError(ErrorCode.InternalError, String(error))
|
|
};
|
|
}
|
|
return;
|
|
}
|
|
// For task-augmented requests, we need to poll for status
|
|
// First, make the request to create the task
|
|
let taskId;
|
|
try {
|
|
// Send the request and get the CreateTaskResult
|
|
const createResult = await this.request(request, CreateTaskResultSchema, options);
|
|
// Extract taskId from the result
|
|
if (createResult.task) {
|
|
taskId = createResult.task.taskId;
|
|
yield { type: 'taskCreated', task: createResult.task };
|
|
}
|
|
else {
|
|
throw new McpError(ErrorCode.InternalError, 'Task creation did not return a task');
|
|
}
|
|
// Poll for task completion
|
|
while (true) {
|
|
// Get current task status
|
|
const task = await this.getTask({ taskId }, options);
|
|
yield { type: 'taskStatus', task };
|
|
// Check if task is terminal
|
|
if (isTerminal(task.status)) {
|
|
if (task.status === 'completed') {
|
|
// Get the final result
|
|
const result = await this.getTaskResult({ taskId }, resultSchema, options);
|
|
yield { type: 'result', result };
|
|
}
|
|
else if (task.status === 'failed') {
|
|
yield {
|
|
type: 'error',
|
|
error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`)
|
|
};
|
|
}
|
|
else if (task.status === 'cancelled') {
|
|
yield {
|
|
type: 'error',
|
|
error: new McpError(ErrorCode.InternalError, `Task ${taskId} was cancelled`)
|
|
};
|
|
}
|
|
return;
|
|
}
|
|
// When input_required, call tasks/result to deliver queued messages
|
|
// (elicitation, sampling) via SSE and block until terminal
|
|
if (task.status === 'input_required') {
|
|
const result = await this.getTaskResult({ taskId }, resultSchema, options);
|
|
yield { type: 'result', result };
|
|
return;
|
|
}
|
|
// Wait before polling again
|
|
const pollInterval = task.pollInterval ?? this._options?.defaultTaskPollInterval ?? 1000;
|
|
await new Promise(resolve => setTimeout(resolve, pollInterval));
|
|
// Check if cancelled
|
|
options?.signal?.throwIfAborted();
|
|
}
|
|
}
|
|
catch (error) {
|
|
yield {
|
|
type: 'error',
|
|
error: error instanceof McpError ? error : new McpError(ErrorCode.InternalError, String(error))
|
|
};
|
|
}
|
|
}
|
|
/**
|
|
* Sends a request and waits for a response.
|
|
*
|
|
* Do not use this method to emit notifications! Use notification() instead.
|
|
*/
|
|
request(request, resultSchema, options) {
|
|
const { relatedRequestId, resumptionToken, onresumptiontoken, task, relatedTask } = options ?? {};
|
|
// Send the request
|
|
return new Promise((resolve, reject) => {
|
|
const earlyReject = (error) => {
|
|
reject(error);
|
|
};
|
|
if (!this._transport) {
|
|
earlyReject(new Error('Not connected'));
|
|
return;
|
|
}
|
|
if (this._options?.enforceStrictCapabilities === true) {
|
|
try {
|
|
this.assertCapabilityForMethod(request.method);
|
|
// If task creation is requested, also check task capabilities
|
|
if (task) {
|
|
this.assertTaskCapability(request.method);
|
|
}
|
|
}
|
|
catch (e) {
|
|
earlyReject(e);
|
|
return;
|
|
}
|
|
}
|
|
options?.signal?.throwIfAborted();
|
|
const messageId = this._requestMessageId++;
|
|
const jsonrpcRequest = {
|
|
...request,
|
|
jsonrpc: '2.0',
|
|
id: messageId
|
|
};
|
|
if (options?.onprogress) {
|
|
this._progressHandlers.set(messageId, options.onprogress);
|
|
jsonrpcRequest.params = {
|
|
...request.params,
|
|
_meta: {
|
|
...(request.params?._meta || {}),
|
|
progressToken: messageId
|
|
}
|
|
};
|
|
}
|
|
// Augment with task creation parameters if provided
|
|
if (task) {
|
|
jsonrpcRequest.params = {
|
|
...jsonrpcRequest.params,
|
|
task: task
|
|
};
|
|
}
|
|
// Augment with related task metadata if relatedTask is provided
|
|
if (relatedTask) {
|
|
jsonrpcRequest.params = {
|
|
...jsonrpcRequest.params,
|
|
_meta: {
|
|
...(jsonrpcRequest.params?._meta || {}),
|
|
[RELATED_TASK_META_KEY]: relatedTask
|
|
}
|
|
};
|
|
}
|
|
const cancel = (reason) => {
|
|
this._responseHandlers.delete(messageId);
|
|
this._progressHandlers.delete(messageId);
|
|
this._cleanupTimeout(messageId);
|
|
this._transport
|
|
?.send({
|
|
jsonrpc: '2.0',
|
|
method: 'notifications/cancelled',
|
|
params: {
|
|
requestId: messageId,
|
|
reason: String(reason)
|
|
}
|
|
}, { relatedRequestId, resumptionToken, onresumptiontoken })
|
|
.catch(error => this._onerror(new Error(`Failed to send cancellation: ${error}`)));
|
|
// Wrap the reason in an McpError if it isn't already
|
|
const error = reason instanceof McpError ? reason : new McpError(ErrorCode.RequestTimeout, String(reason));
|
|
reject(error);
|
|
};
|
|
this._responseHandlers.set(messageId, response => {
|
|
if (options?.signal?.aborted) {
|
|
return;
|
|
}
|
|
if (response instanceof Error) {
|
|
return reject(response);
|
|
}
|
|
try {
|
|
const parseResult = safeParse(resultSchema, response.result);
|
|
if (!parseResult.success) {
|
|
// Type guard: if success is false, error is guaranteed to exist
|
|
reject(parseResult.error);
|
|
}
|
|
else {
|
|
resolve(parseResult.data);
|
|
}
|
|
}
|
|
catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
options?.signal?.addEventListener('abort', () => {
|
|
cancel(options?.signal?.reason);
|
|
});
|
|
const timeout = options?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
|
|
const timeoutHandler = () => cancel(McpError.fromError(ErrorCode.RequestTimeout, 'Request timed out', { timeout }));
|
|
this._setupTimeout(messageId, timeout, options?.maxTotalTimeout, timeoutHandler, options?.resetTimeoutOnProgress ?? false);
|
|
// Queue request if related to a task
|
|
const relatedTaskId = relatedTask?.taskId;
|
|
if (relatedTaskId) {
|
|
// Store the response resolver for this request so responses can be routed back
|
|
const responseResolver = (response) => {
|
|
const handler = this._responseHandlers.get(messageId);
|
|
if (handler) {
|
|
handler(response);
|
|
}
|
|
else {
|
|
// Log error when resolver is missing, but don't fail
|
|
this._onerror(new Error(`Response handler missing for side-channeled request ${messageId}`));
|
|
}
|
|
};
|
|
this._requestResolvers.set(messageId, responseResolver);
|
|
this._enqueueTaskMessage(relatedTaskId, {
|
|
type: 'request',
|
|
message: jsonrpcRequest,
|
|
timestamp: Date.now()
|
|
}).catch(error => {
|
|
this._cleanupTimeout(messageId);
|
|
reject(error);
|
|
});
|
|
// Don't send through transport - queued messages are delivered via tasks/result only
|
|
// This prevents duplicate delivery for bidirectional transports
|
|
}
|
|
else {
|
|
// No related task - send through transport normally
|
|
this._transport.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
|
|
this._cleanupTimeout(messageId);
|
|
reject(error);
|
|
});
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Gets the current status of a task.
|
|
*
|
|
* @experimental Use `client.experimental.tasks.getTask()` to access this method.
|
|
*/
|
|
async getTask(params, options) {
|
|
// @ts-expect-error SendRequestT cannot directly contain GetTaskRequest, but we ensure all type instantiations contain it anyways
|
|
return this.request({ method: 'tasks/get', params }, GetTaskResultSchema, options);
|
|
}
|
|
/**
|
|
* Retrieves the result of a completed task.
|
|
*
|
|
* @experimental Use `client.experimental.tasks.getTaskResult()` to access this method.
|
|
*/
|
|
async getTaskResult(params, resultSchema, options) {
|
|
// @ts-expect-error SendRequestT cannot directly contain GetTaskPayloadRequest, but we ensure all type instantiations contain it anyways
|
|
return this.request({ method: 'tasks/result', params }, resultSchema, options);
|
|
}
|
|
/**
|
|
* Lists tasks, optionally starting from a pagination cursor.
|
|
*
|
|
* @experimental Use `client.experimental.tasks.listTasks()` to access this method.
|
|
*/
|
|
async listTasks(params, options) {
|
|
// @ts-expect-error SendRequestT cannot directly contain ListTasksRequest, but we ensure all type instantiations contain it anyways
|
|
return this.request({ method: 'tasks/list', params }, ListTasksResultSchema, options);
|
|
}
|
|
/**
|
|
* Cancels a specific task.
|
|
*
|
|
* @experimental Use `client.experimental.tasks.cancelTask()` to access this method.
|
|
*/
|
|
async cancelTask(params, options) {
|
|
// @ts-expect-error SendRequestT cannot directly contain CancelTaskRequest, but we ensure all type instantiations contain it anyways
|
|
return this.request({ method: 'tasks/cancel', params }, CancelTaskResultSchema, options);
|
|
}
|
|
/**
|
|
* Emits a notification, which is a one-way message that does not expect a response.
|
|
*/
|
|
async notification(notification, options) {
|
|
if (!this._transport) {
|
|
throw new Error('Not connected');
|
|
}
|
|
this.assertNotificationCapability(notification.method);
|
|
// Queue notification if related to a task
|
|
const relatedTaskId = options?.relatedTask?.taskId;
|
|
if (relatedTaskId) {
|
|
// Build the JSONRPC notification with metadata
|
|
const jsonrpcNotification = {
|
|
...notification,
|
|
jsonrpc: '2.0',
|
|
params: {
|
|
...notification.params,
|
|
_meta: {
|
|
...(notification.params?._meta || {}),
|
|
[RELATED_TASK_META_KEY]: options.relatedTask
|
|
}
|
|
}
|
|
};
|
|
await this._enqueueTaskMessage(relatedTaskId, {
|
|
type: 'notification',
|
|
message: jsonrpcNotification,
|
|
timestamp: Date.now()
|
|
});
|
|
// Don't send through transport - queued messages are delivered via tasks/result only
|
|
// This prevents duplicate delivery for bidirectional transports
|
|
return;
|
|
}
|
|
const debouncedMethods = this._options?.debouncedNotificationMethods ?? [];
|
|
// A notification can only be debounced if it's in the list AND it's "simple"
|
|
// (i.e., has no parameters and no related request ID or related task that could be lost).
|
|
const canDebounce = debouncedMethods.includes(notification.method) && !notification.params && !options?.relatedRequestId && !options?.relatedTask;
|
|
if (canDebounce) {
|
|
// If a notification of this type is already scheduled, do nothing.
|
|
if (this._pendingDebouncedNotifications.has(notification.method)) {
|
|
return;
|
|
}
|
|
// Mark this notification type as pending.
|
|
this._pendingDebouncedNotifications.add(notification.method);
|
|
// Schedule the actual send to happen in the next microtask.
|
|
// This allows all synchronous calls in the current event loop tick to be coalesced.
|
|
Promise.resolve().then(() => {
|
|
// Un-mark the notification so the next one can be scheduled.
|
|
this._pendingDebouncedNotifications.delete(notification.method);
|
|
// SAFETY CHECK: If the connection was closed while this was pending, abort.
|
|
if (!this._transport) {
|
|
return;
|
|
}
|
|
let jsonrpcNotification = {
|
|
...notification,
|
|
jsonrpc: '2.0'
|
|
};
|
|
// Augment with related task metadata if relatedTask is provided
|
|
if (options?.relatedTask) {
|
|
jsonrpcNotification = {
|
|
...jsonrpcNotification,
|
|
params: {
|
|
...jsonrpcNotification.params,
|
|
_meta: {
|
|
...(jsonrpcNotification.params?._meta || {}),
|
|
[RELATED_TASK_META_KEY]: options.relatedTask
|
|
}
|
|
}
|
|
};
|
|
}
|
|
// Send the notification, but don't await it here to avoid blocking.
|
|
// Handle potential errors with a .catch().
|
|
this._transport?.send(jsonrpcNotification, options).catch(error => this._onerror(error));
|
|
});
|
|
// Return immediately.
|
|
return;
|
|
}
|
|
let jsonrpcNotification = {
|
|
...notification,
|
|
jsonrpc: '2.0'
|
|
};
|
|
// Augment with related task metadata if relatedTask is provided
|
|
if (options?.relatedTask) {
|
|
jsonrpcNotification = {
|
|
...jsonrpcNotification,
|
|
params: {
|
|
...jsonrpcNotification.params,
|
|
_meta: {
|
|
...(jsonrpcNotification.params?._meta || {}),
|
|
[RELATED_TASK_META_KEY]: options.relatedTask
|
|
}
|
|
}
|
|
};
|
|
}
|
|
await this._transport.send(jsonrpcNotification, options);
|
|
}
|
|
/**
|
|
* Registers a handler to invoke when this protocol object receives a request with the given method.
|
|
*
|
|
* Note that this will replace any previous request handler for the same method.
|
|
*/
|
|
setRequestHandler(requestSchema, handler) {
|
|
const method = getMethodLiteral(requestSchema);
|
|
this.assertRequestHandlerCapability(method);
|
|
this._requestHandlers.set(method, (request, extra) => {
|
|
const parsed = parseWithCompat(requestSchema, request);
|
|
return Promise.resolve(handler(parsed, extra));
|
|
});
|
|
}
|
|
/**
|
|
* Removes the request handler for the given method.
|
|
*/
|
|
removeRequestHandler(method) {
|
|
this._requestHandlers.delete(method);
|
|
}
|
|
/**
|
|
* Asserts that a request handler has not already been set for the given method, in preparation for a new one being automatically installed.
|
|
*/
|
|
assertCanSetRequestHandler(method) {
|
|
if (this._requestHandlers.has(method)) {
|
|
throw new Error(`A request handler for ${method} already exists, which would be overridden`);
|
|
}
|
|
}
|
|
/**
|
|
* Registers a handler to invoke when this protocol object receives a notification with the given method.
|
|
*
|
|
* Note that this will replace any previous notification handler for the same method.
|
|
*/
|
|
setNotificationHandler(notificationSchema, handler) {
|
|
const method = getMethodLiteral(notificationSchema);
|
|
this._notificationHandlers.set(method, notification => {
|
|
const parsed = parseWithCompat(notificationSchema, notification);
|
|
return Promise.resolve(handler(parsed));
|
|
});
|
|
}
|
|
/**
|
|
* Removes the notification handler for the given method.
|
|
*/
|
|
removeNotificationHandler(method) {
|
|
this._notificationHandlers.delete(method);
|
|
}
|
|
/**
|
|
* Cleans up the progress handler associated with a task.
|
|
* This should be called when a task reaches a terminal status.
|
|
*/
|
|
_cleanupTaskProgressHandler(taskId) {
|
|
const progressToken = this._taskProgressTokens.get(taskId);
|
|
if (progressToken !== undefined) {
|
|
this._progressHandlers.delete(progressToken);
|
|
this._taskProgressTokens.delete(taskId);
|
|
}
|
|
}
|
|
/**
|
|
* Enqueues a task-related message for side-channel delivery via tasks/result.
|
|
* @param taskId The task ID to associate the message with
|
|
* @param message The message to enqueue
|
|
* @param sessionId Optional session ID for binding the operation to a specific session
|
|
* @throws Error if taskStore is not configured or if enqueue fails (e.g., queue overflow)
|
|
*
|
|
* Note: If enqueue fails, it's the TaskMessageQueue implementation's responsibility to handle
|
|
* the error appropriately (e.g., by failing the task, logging, etc.). The Protocol layer
|
|
* simply propagates the error.
|
|
*/
|
|
async _enqueueTaskMessage(taskId, message, sessionId) {
|
|
// Task message queues are only used when taskStore is configured
|
|
if (!this._taskStore || !this._taskMessageQueue) {
|
|
throw new Error('Cannot enqueue task message: taskStore and taskMessageQueue are not configured');
|
|
}
|
|
const maxQueueSize = this._options?.maxTaskQueueSize;
|
|
await this._taskMessageQueue.enqueue(taskId, message, sessionId, maxQueueSize);
|
|
}
|
|
/**
|
|
* Clears the message queue for a task and rejects any pending request resolvers.
|
|
* @param taskId The task ID whose queue should be cleared
|
|
* @param sessionId Optional session ID for binding the operation to a specific session
|
|
*/
|
|
async _clearTaskQueue(taskId, sessionId) {
|
|
if (this._taskMessageQueue) {
|
|
// Reject any pending request resolvers
|
|
const messages = await this._taskMessageQueue.dequeueAll(taskId, sessionId);
|
|
for (const message of messages) {
|
|
if (message.type === 'request' && isJSONRPCRequest(message.message)) {
|
|
// Extract request ID from the message
|
|
const requestId = message.message.id;
|
|
const resolver = this._requestResolvers.get(requestId);
|
|
if (resolver) {
|
|
resolver(new McpError(ErrorCode.InternalError, 'Task cancelled or completed'));
|
|
this._requestResolvers.delete(requestId);
|
|
}
|
|
else {
|
|
// Log error when resolver is missing during cleanup for better observability
|
|
this._onerror(new Error(`Resolver missing for request ${requestId} during task ${taskId} cleanup`));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Waits for a task update (new messages or status change) with abort signal support.
|
|
* Uses polling to check for updates at the task's configured poll interval.
|
|
* @param taskId The task ID to wait for
|
|
* @param signal Abort signal to cancel the wait
|
|
* @returns Promise that resolves when an update occurs or rejects if aborted
|
|
*/
|
|
async _waitForTaskUpdate(taskId, signal) {
|
|
// Get the task's poll interval, falling back to default
|
|
let interval = this._options?.defaultTaskPollInterval ?? 1000;
|
|
try {
|
|
const task = await this._taskStore?.getTask(taskId);
|
|
if (task?.pollInterval) {
|
|
interval = task.pollInterval;
|
|
}
|
|
}
|
|
catch {
|
|
// Use default interval if task lookup fails
|
|
}
|
|
return new Promise((resolve, reject) => {
|
|
if (signal.aborted) {
|
|
reject(new McpError(ErrorCode.InvalidRequest, 'Request cancelled'));
|
|
return;
|
|
}
|
|
// Wait for the poll interval, then resolve so caller can check for updates
|
|
const timeoutId = setTimeout(resolve, interval);
|
|
// Clean up timeout and reject if aborted
|
|
signal.addEventListener('abort', () => {
|
|
clearTimeout(timeoutId);
|
|
reject(new McpError(ErrorCode.InvalidRequest, 'Request cancelled'));
|
|
}, { once: true });
|
|
});
|
|
}
|
|
requestTaskStore(request, sessionId) {
|
|
const taskStore = this._taskStore;
|
|
if (!taskStore) {
|
|
throw new Error('No task store configured');
|
|
}
|
|
return {
|
|
createTask: async (taskParams) => {
|
|
if (!request) {
|
|
throw new Error('No request provided');
|
|
}
|
|
return await taskStore.createTask(taskParams, request.id, {
|
|
method: request.method,
|
|
params: request.params
|
|
}, sessionId);
|
|
},
|
|
getTask: async (taskId) => {
|
|
const task = await taskStore.getTask(taskId, sessionId);
|
|
if (!task) {
|
|
throw new McpError(ErrorCode.InvalidParams, 'Failed to retrieve task: Task not found');
|
|
}
|
|
return task;
|
|
},
|
|
storeTaskResult: async (taskId, status, result) => {
|
|
await taskStore.storeTaskResult(taskId, status, result, sessionId);
|
|
// Get updated task state and send notification
|
|
const task = await taskStore.getTask(taskId, sessionId);
|
|
if (task) {
|
|
const notification = TaskStatusNotificationSchema.parse({
|
|
method: 'notifications/tasks/status',
|
|
params: task
|
|
});
|
|
await this.notification(notification);
|
|
if (isTerminal(task.status)) {
|
|
this._cleanupTaskProgressHandler(taskId);
|
|
// Don't clear queue here - it will be cleared after delivery via tasks/result
|
|
}
|
|
}
|
|
},
|
|
getTaskResult: taskId => {
|
|
return taskStore.getTaskResult(taskId, sessionId);
|
|
},
|
|
updateTaskStatus: async (taskId, status, statusMessage) => {
|
|
// Check if task exists
|
|
const task = await taskStore.getTask(taskId, sessionId);
|
|
if (!task) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Task "${taskId}" not found - it may have been cleaned up`);
|
|
}
|
|
// Don't allow transitions from terminal states
|
|
if (isTerminal(task.status)) {
|
|
throw new McpError(ErrorCode.InvalidParams, `Cannot update task "${taskId}" from terminal status "${task.status}" to "${status}". Terminal states (completed, failed, cancelled) cannot transition to other states.`);
|
|
}
|
|
await taskStore.updateTaskStatus(taskId, status, statusMessage, sessionId);
|
|
// Get updated task state and send notification
|
|
const updatedTask = await taskStore.getTask(taskId, sessionId);
|
|
if (updatedTask) {
|
|
const notification = TaskStatusNotificationSchema.parse({
|
|
method: 'notifications/tasks/status',
|
|
params: updatedTask
|
|
});
|
|
await this.notification(notification);
|
|
if (isTerminal(updatedTask.status)) {
|
|
this._cleanupTaskProgressHandler(taskId);
|
|
// Don't clear queue here - it will be cleared after delivery via tasks/result
|
|
}
|
|
}
|
|
},
|
|
listTasks: cursor => {
|
|
return taskStore.listTasks(cursor, sessionId);
|
|
}
|
|
};
|
|
}
|
|
}
|
|
function isPlainObject(value) {
|
|
return value !== null && typeof value === 'object' && !Array.isArray(value);
|
|
}
|
|
export function mergeCapabilities(base, additional) {
|
|
const result = { ...base };
|
|
for (const key in additional) {
|
|
const k = key;
|
|
const addValue = additional[k];
|
|
if (addValue === undefined)
|
|
continue;
|
|
const baseValue = result[k];
|
|
if (isPlainObject(baseValue) && isPlainObject(addValue)) {
|
|
result[k] = { ...baseValue, ...addValue };
|
|
}
|
|
else {
|
|
result[k] = addValue;
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
//# sourceMappingURL=protocol.js.map
|