title: 'Building Custom Transports' description: 'Extending MCP with custom transport implementations'
Building Custom Transports
While MCP provides stdio, SSE, and HTTP transports out of the box, you may need custom transports for specific use cases. In this lesson, we'll explore when and how to build custom transports.
When to Build a Custom Transport
Consider a custom transport when:
- WebSockets: You need bidirectional, full-duplex communication
- gRPC: You want strongly-typed, efficient RPC
- Message Queues: You need asynchronous, queued communication (RabbitMQ, SQS)
- Custom Protocols: You have existing infrastructure to leverage
- Performance: You need optimizations for specific scenarios
Transport Interface
MCP transports must implement the transport interface:
interface Transport {
// Start the transport
start(): Promise<void>;
// Send a message
send(message: JSONRPCMessage): Promise<void>;
// Receive messages (callback-based)
onMessage(callback: (message: JSONRPCMessage) => void): void;
// Handle errors
onError(callback: (error: Error) => void): void;
// Close the transport
close(): Promise<void>;
}
Example: WebSocket Transport
Build a WebSocket transport for bidirectional communication:
import WebSocket from 'ws';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
export class WebSocketTransport implements Transport {
private ws: WebSocket;
private messageCallback?: (message: JSONRPCMessage) => void;
private errorCallback?: (error: Error) => void;
constructor(url: string) {
this.ws = new WebSocket(url);
}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws.on('open', () => {
resolve();
});
this.ws.on('error', (error) => {
reject(error);
});
this.ws.on('message', (data: string) => {
try {
const message = JSON.parse(data) as JSONRPCMessage;
if (this.messageCallback) {
this.messageCallback(message);
}
} catch (error) {
if (this.errorCallback) {
this.errorCallback(error as Error);
}
}
});
this.ws.on('error', (error) => {
if (this.errorCallback) {
this.errorCallback(error);
}
});
});
}
async send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve, reject) => {
this.ws.send(JSON.stringify(message), (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
onMessage(callback: (message: JSONRPCMessage) => void): void {
this.messageCallback = callback;
}
onError(callback: (error: Error) => void): void {
this.errorCallback = callback;
}
async close(): Promise<void> {
return new Promise((resolve) => {
this.ws.close();
this.ws.on('close', () => {
resolve();
});
});
}
}
Using the WebSocket Transport
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { WebSocketTransport } from './websocket-transport.js';
const server = new Server(
{ name: 'ws-server', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
// Register handlers...
// Connect with WebSocket transport
const transport = new WebSocketTransport('ws://localhost:8080');
await transport.start();
await server.connect(transport);
Example: Redis Pub/Sub Transport
For asynchronous, message queue-based communication:
import { Redis } from 'ioredis';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
export class RedisPubSubTransport implements Transport {
private pubClient: Redis;
private subClient: Redis;
private channelPrefix: string;
private serverId: string;
private messageCallback?: (message: JSONRPCMessage) => void;
constructor(serverId: string, redisUrl: string) {
this.serverId = serverId;
this.channelPrefix = 'mcp:';
this.pubClient = new Redis(redisUrl);
this.subClient = new Redis(redisUrl);
}
async start(): Promise<void> {
// Subscribe to messages for this server
await this.subClient.subscribe(`${this.channelPrefix}${this.serverId}`);
this.subClient.on('message', (channel, message) => {
try {
const parsed = JSON.parse(message);
if (this.messageCallback) {
this.messageCallback(parsed);
}
} catch (error) {
console.error('Failed to parse message:', error);
}
});
}
async send(message: JSONRPCMessage): Promise<void> {
// Publish to client channel (reverse of server ID)
const targetChannel = `${this.channelPrefix}client`;
await this.pubClient.publish(targetChannel, JSON.stringify(message));
}
onMessage(callback: (message: JSONRPCMessage) => void): void {
this.messageCallback = callback;
}
onError(callback: (error: Error) => void): void {
this.pubClient.on('error', callback);
this.subClient.on('error', callback);
}
async close(): Promise<void> {
await this.subClient.unsubscribe();
await this.pubClient.quit();
await this.subClient.quit();
}
}
Example: Unix Domain Socket Transport
For high-performance local IPC:
import * as net from 'net';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
export class UnixSocketTransport implements Transport {
private socket: net.Socket;
private socketPath: string;
private buffer: string = '';
constructor(socketPath: string) {
this.socketPath = socketPath;
this.socket = new net.Socket();
}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.socket.connect(this.socketPath, () => {
resolve();
});
this.socket.on('error', reject);
this.socket.on('data', (data: Buffer) => {
this.buffer += data.toString();
// Process complete JSON-RPC messages (line-delimited)
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
if (this.messageCallback) {
this.messageCallback(message);
}
} catch (error) {
console.error('Failed to parse message:', error);
}
}
}
});
});
}
async send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve, reject) => {
const data = JSON.stringify(message) + '\n';
this.socket.write(data, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
onMessage(callback: (message: JSONRPCMessage) => void): void {
this.messageCallback = callback;
}
onError(callback: (error: Error) => void): void {
this.socket.on('error', callback);
}
async close(): Promise<void> {
return new Promise((resolve) => {
this.socket.end(() => {
resolve();
});
});
}
}
Transport Features to Consider
Reconnection
Implement automatic reconnection for resilient transports:
class ResilientTransport implements Transport {
private innerTransport: Transport;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
async start(): Promise<void> {
while (this.reconnectAttempts < this.maxReconnectAttempts) {
try {
await this.innerTransport.start();
this.reconnectAttempts = 0; // Reset on success
return;
} catch (error) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error('Failed to connect after maximum retry attempts');
}
}
Message Queuing
Buffer messages when connection is unavailable:
class QueuedTransport implements Transport {
private queue: JSONRPCMessage[] = [];
private connected = false;
async send(message: JSONRPCMessage): Promise<void> {
if (!this.connected) {
this.queue.push(message);
return;
}
// Send immediately if connected
await this.innerTransport.send(message);
}
private async flushQueue() {
while (this.queue.length > 0) {
const message = this.queue.shift()!;
await this.innerTransport.send(message);
}
}
}
Compression
Add compression for large messages:
import { gzip, gunzip } from 'zlib';
import { promisify } from 'util';
const gzipAsync = promisify(gzip);
const gunzipAsync = promisify(gunzip);
class CompressedTransport implements Transport {
async send(message: JSONRPCMessage): Promise<void> {
const json = JSON.stringify(message);
// Only compress if message is large
if (json.length > 1024) {
const compressed = await gzipAsync(Buffer.from(json));
await this.innerTransport.send({
...message,
_compressed: compressed.toString('base64')
});
} else {
await this.innerTransport.send(message);
}
}
onMessage(callback: (message: JSONRPCMessage) => void): void {
this.innerTransport.onMessage(async (message: any) => {
if (message._compressed) {
const buffer = Buffer.from(message._compressed, 'base64');
const decompressed = await gunzipAsync(buffer);
const original = JSON.parse(decompressed.toString());
callback(original);
} else {
callback(message);
}
});
}
}
Testing Custom Transports
Test transports thoroughly:
import { describe, it, expect } from 'vitest';
describe('WebSocketTransport', () => {
it('should send and receive messages', async () => {
const transport = new WebSocketTransport('ws://localhost:8080');
await transport.start();
const receivedMessages: JSONRPCMessage[] = [];
transport.onMessage((message) => {
receivedMessages.push(message);
});
const testMessage = {
jsonrpc: '2.0',
id: 1,
method: 'test'
};
await transport.send(testMessage);
// Wait for message
await new Promise(resolve => setTimeout(resolve, 100));
expect(receivedMessages).toContainEqual(testMessage);
await transport.close();
});
});
Best Practices
- Handle errors gracefully: Network errors, parsing errors, connection drops
- Implement reconnection: For unstable connections
- Buffer messages: Don't lose messages during brief disconnections
- Add timeouts: Prevent indefinite waits
- Test thoroughly: Network issues are hard to reproduce
- Document behavior: How does your transport handle failures?
- Monitor performance: Track latency, throughput, error rates
Custom transports unlock new deployment models and performance characteristics for MCP servers. In the next lesson, we'll explore enterprise integration patterns.