title: 'Building Custom Transports' description: 'Extending MCP with custom transport implementations'

Building Custom Transports

While MCP provides stdio, SSE, and HTTP transports out of the box, you may need custom transports for specific use cases. In this lesson, we'll explore when and how to build custom transports.

When to Build a Custom Transport

Consider a custom transport when:

  • WebSockets: You need bidirectional, full-duplex communication
  • gRPC: You want strongly-typed, efficient RPC
  • Message Queues: You need asynchronous, queued communication (RabbitMQ, SQS)
  • Custom Protocols: You have existing infrastructure to leverage
  • Performance: You need optimizations for specific scenarios

Transport Interface

MCP transports must implement the transport interface:

interface Transport {
  // Start the transport
  start(): Promise<void>;

  // Send a message
  send(message: JSONRPCMessage): Promise<void>;

  // Receive messages (callback-based)
  onMessage(callback: (message: JSONRPCMessage) => void): void;

  // Handle errors
  onError(callback: (error: Error) => void): void;

  // Close the transport
  close(): Promise<void>;
}

Example: WebSocket Transport

Build a WebSocket transport for bidirectional communication:

import WebSocket from 'ws';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';

export class WebSocketTransport implements Transport {
  private ws: WebSocket;
  private messageCallback?: (message: JSONRPCMessage) => void;
  private errorCallback?: (error: Error) => void;

  constructor(url: string) {
    this.ws = new WebSocket(url);
  }

  async start(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.ws.on('open', () => {
        resolve();
      });

      this.ws.on('error', (error) => {
        reject(error);
      });

      this.ws.on('message', (data: string) => {
        try {
          const message = JSON.parse(data) as JSONRPCMessage;
          if (this.messageCallback) {
            this.messageCallback(message);
          }
        } catch (error) {
          if (this.errorCallback) {
            this.errorCallback(error as Error);
          }
        }
      });

      this.ws.on('error', (error) => {
        if (this.errorCallback) {
          this.errorCallback(error);
        }
      });
    });
  }

  async send(message: JSONRPCMessage): Promise<void> {
    return new Promise((resolve, reject) => {
      this.ws.send(JSON.stringify(message), (error) => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      });
    });
  }

  onMessage(callback: (message: JSONRPCMessage) => void): void {
    this.messageCallback = callback;
  }

  onError(callback: (error: Error) => void): void {
    this.errorCallback = callback;
  }

  async close(): Promise<void> {
    return new Promise((resolve) => {
      this.ws.close();
      this.ws.on('close', () => {
        resolve();
      });
    });
  }
}

Using the WebSocket Transport

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { WebSocketTransport } from './websocket-transport.js';

const server = new Server(
  { name: 'ws-server', version: '1.0.0' },
  { capabilities: { tools: {} } }
);

// Register handlers...

// Connect with WebSocket transport
const transport = new WebSocketTransport('ws://localhost:8080');
await transport.start();
await server.connect(transport);

Example: Redis Pub/Sub Transport

For asynchronous, message queue-based communication:

import { Redis } from 'ioredis';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';

export class RedisPubSubTransport implements Transport {
  private pubClient: Redis;
  private subClient: Redis;
  private channelPrefix: string;
  private serverId: string;
  private messageCallback?: (message: JSONRPCMessage) => void;

  constructor(serverId: string, redisUrl: string) {
    this.serverId = serverId;
    this.channelPrefix = 'mcp:';
    this.pubClient = new Redis(redisUrl);
    this.subClient = new Redis(redisUrl);
  }

  async start(): Promise<void> {
    // Subscribe to messages for this server
    await this.subClient.subscribe(`${this.channelPrefix}${this.serverId}`);

    this.subClient.on('message', (channel, message) => {
      try {
        const parsed = JSON.parse(message);
        if (this.messageCallback) {
          this.messageCallback(parsed);
        }
      } catch (error) {
        console.error('Failed to parse message:', error);
      }
    });
  }

  async send(message: JSONRPCMessage): Promise<void> {
    // Publish to client channel (reverse of server ID)
    const targetChannel = `${this.channelPrefix}client`;
    await this.pubClient.publish(targetChannel, JSON.stringify(message));
  }

  onMessage(callback: (message: JSONRPCMessage) => void): void {
    this.messageCallback = callback;
  }

  onError(callback: (error: Error) => void): void {
    this.pubClient.on('error', callback);
    this.subClient.on('error', callback);
  }

  async close(): Promise<void> {
    await this.subClient.unsubscribe();
    await this.pubClient.quit();
    await this.subClient.quit();
  }
}

Example: Unix Domain Socket Transport

For high-performance local IPC:

import * as net from 'net';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';

export class UnixSocketTransport implements Transport {
  private socket: net.Socket;
  private socketPath: string;
  private buffer: string = '';

  constructor(socketPath: string) {
    this.socketPath = socketPath;
    this.socket = new net.Socket();
  }

  async start(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.socket.connect(this.socketPath, () => {
        resolve();
      });

      this.socket.on('error', reject);

      this.socket.on('data', (data: Buffer) => {
        this.buffer += data.toString();

        // Process complete JSON-RPC messages (line-delimited)
        const lines = this.buffer.split('\n');
        this.buffer = lines.pop() || '';

        for (const line of lines) {
          if (line.trim()) {
            try {
              const message = JSON.parse(line);
              if (this.messageCallback) {
                this.messageCallback(message);
              }
            } catch (error) {
              console.error('Failed to parse message:', error);
            }
          }
        }
      });
    });
  }

  async send(message: JSONRPCMessage): Promise<void> {
    return new Promise((resolve, reject) => {
      const data = JSON.stringify(message) + '\n';
      this.socket.write(data, (error) => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      });
    });
  }

  onMessage(callback: (message: JSONRPCMessage) => void): void {
    this.messageCallback = callback;
  }

  onError(callback: (error: Error) => void): void {
    this.socket.on('error', callback);
  }

  async close(): Promise<void> {
    return new Promise((resolve) => {
      this.socket.end(() => {
        resolve();
      });
    });
  }
}

Transport Features to Consider

Reconnection

Implement automatic reconnection for resilient transports:

class ResilientTransport implements Transport {
  private innerTransport: Transport;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  async start(): Promise<void> {
    while (this.reconnectAttempts < this.maxReconnectAttempts) {
      try {
        await this.innerTransport.start();
        this.reconnectAttempts = 0; // Reset on success
        return;
      } catch (error) {
        this.reconnectAttempts++;
        const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }

    throw new Error('Failed to connect after maximum retry attempts');
  }
}

Message Queuing

Buffer messages when connection is unavailable:

class QueuedTransport implements Transport {
  private queue: JSONRPCMessage[] = [];
  private connected = false;

  async send(message: JSONRPCMessage): Promise<void> {
    if (!this.connected) {
      this.queue.push(message);
      return;
    }

    // Send immediately if connected
    await this.innerTransport.send(message);
  }

  private async flushQueue() {
    while (this.queue.length > 0) {
      const message = this.queue.shift()!;
      await this.innerTransport.send(message);
    }
  }
}

Compression

Add compression for large messages:

import { gzip, gunzip } from 'zlib';
import { promisify } from 'util';

const gzipAsync = promisify(gzip);
const gunzipAsync = promisify(gunzip);

class CompressedTransport implements Transport {
  async send(message: JSONRPCMessage): Promise<void> {
    const json = JSON.stringify(message);

    // Only compress if message is large
    if (json.length > 1024) {
      const compressed = await gzipAsync(Buffer.from(json));
      await this.innerTransport.send({
        ...message,
        _compressed: compressed.toString('base64')
      });
    } else {
      await this.innerTransport.send(message);
    }
  }

  onMessage(callback: (message: JSONRPCMessage) => void): void {
    this.innerTransport.onMessage(async (message: any) => {
      if (message._compressed) {
        const buffer = Buffer.from(message._compressed, 'base64');
        const decompressed = await gunzipAsync(buffer);
        const original = JSON.parse(decompressed.toString());
        callback(original);
      } else {
        callback(message);
      }
    });
  }
}

Testing Custom Transports

Test transports thoroughly:

import { describe, it, expect } from 'vitest';

describe('WebSocketTransport', () => {
  it('should send and receive messages', async () => {
    const transport = new WebSocketTransport('ws://localhost:8080');
    await transport.start();

    const receivedMessages: JSONRPCMessage[] = [];
    transport.onMessage((message) => {
      receivedMessages.push(message);
    });

    const testMessage = {
      jsonrpc: '2.0',
      id: 1,
      method: 'test'
    };

    await transport.send(testMessage);

    // Wait for message
    await new Promise(resolve => setTimeout(resolve, 100));

    expect(receivedMessages).toContainEqual(testMessage);

    await transport.close();
  });
});

Best Practices

  1. Handle errors gracefully: Network errors, parsing errors, connection drops
  2. Implement reconnection: For unstable connections
  3. Buffer messages: Don't lose messages during brief disconnections
  4. Add timeouts: Prevent indefinite waits
  5. Test thoroughly: Network issues are hard to reproduce
  6. Document behavior: How does your transport handle failures?
  7. Monitor performance: Track latency, throughput, error rates

Custom transports unlock new deployment models and performance characteristics for MCP servers. In the next lesson, we'll explore enterprise integration patterns.

Building Custom Transports - Compass | Nick Treffiletti — MCP, AI Agents & Platform Engineering