Extensibility¶
mcp-data-platform is designed for extension. You can add custom toolkits, providers, and middleware to build specialized MCP servers.
Custom Toolkits¶
Create a toolkit that wraps your domain-specific tools.
Toolkit Interface¶
type Toolkit interface {
Kind() string
Name() string
RegisterTools(s *mcp.Server)
Tools() []string
SetSemanticProvider(provider semantic.Provider)
SetQueryProvider(provider query.Provider)
Close() error
}
Example: Custom Toolkit¶
package mytoolkit
import (
"context"
"encoding/json"
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/txn2/mcp-data-platform/pkg/query"
"github.com/txn2/mcp-data-platform/pkg/semantic"
)
type Config struct {
Endpoint string
APIKey string
}
type Toolkit struct {
name string
config Config
client *MyClient
semanticProvider semantic.Provider
queryProvider query.Provider
}
func New(name string, cfg Config) (*Toolkit, error) {
client, err := NewMyClient(cfg.Endpoint, cfg.APIKey)
if err != nil {
return nil, err
}
return &Toolkit{
name: name,
config: cfg,
client: client,
}, nil
}
func (t *Toolkit) Kind() string { return "mytoolkit" }
func (t *Toolkit) Name() string { return t.name }
func (t *Toolkit) RegisterTools(s *mcp.Server) {
s.AddTool(mcp.Tool{
Name: "mytoolkit_do_something",
Description: "Does something useful",
InputSchema: mcp.ToolInputSchema{
Type: "object",
Properties: map[string]any{
"input": map[string]any{
"type": "string",
"description": "The input to process",
},
},
Required: []string{"input"},
},
}, t.handleDoSomething)
}
func (t *Toolkit) handleDoSomething(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Parse arguments
var args struct {
Input string `json:"input"`
}
if err := json.Unmarshal(req.Params.Arguments, &args); err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
// Do the work
result, err := t.client.DoSomething(ctx, args.Input)
if err != nil {
return mcp.NewToolResultError(err.Error()), nil
}
return mcp.NewToolResultText(result), nil
}
func (t *Toolkit) Tools() []string {
return []string{"mytoolkit_do_something"}
}
func (t *Toolkit) SetSemanticProvider(p semantic.Provider) { t.semanticProvider = p }
func (t *Toolkit) SetQueryProvider(p query.Provider) { t.queryProvider = p }
func (t *Toolkit) Close() error {
return t.client.Close()
}
Register Custom Toolkit¶
import (
"github.com/txn2/mcp-data-platform/pkg/platform"
"mycompany/mytoolkit"
)
func main() {
myTk, _ := mytoolkit.New("primary", mytoolkit.Config{
Endpoint: "https://api.example.com",
APIKey: os.Getenv("MY_API_KEY"),
})
p, _ := platform.New(
platform.WithServerName("custom-platform"),
platform.WithCustomToolkit(myTk),
)
p.Run()
}
Custom Providers¶
Create custom semantic, query, or storage providers.
Semantic Provider¶
package mysemanticprovider
import (
"context"
"github.com/txn2/mcp-data-platform/pkg/semantic"
)
type Provider struct {
client *MyMetadataClient
}
func New(endpoint, token string) (*Provider, error) {
client, err := NewMyMetadataClient(endpoint, token)
if err != nil {
return nil, err
}
return &Provider{client: client}, nil
}
func (p *Provider) Name() string {
return "my-semantic-provider"
}
func (p *Provider) GetTableContext(ctx context.Context, table semantic.TableIdentifier) (*semantic.TableContext, error) {
// Look up metadata from your system
metadata, err := p.client.GetMetadata(ctx, table.Catalog, table.Schema, table.Table)
if err != nil {
return nil, err
}
return &semantic.TableContext{
Description: metadata.Description,
Owners: convertOwners(metadata.Owners),
Tags: metadata.Tags,
Domain: convertDomain(metadata.Domain),
QualityScore: metadata.QualityScore,
}, nil
}
func (p *Provider) SearchTables(ctx context.Context, filter semantic.SearchFilter) ([]semantic.TableSearchResult, error) {
results, err := p.client.Search(ctx, filter.Query, filter.Limit)
if err != nil {
return nil, err
}
var out []semantic.TableSearchResult
for _, r := range results {
out = append(out, semantic.TableSearchResult{
URN: r.URN,
Name: r.Name,
Description: r.Description,
Platform: r.Platform,
})
}
return out, nil
}
// Implement other interface methods...
func (p *Provider) Close() error {
return p.client.Close()
}
Register Custom Provider¶
import (
"github.com/txn2/mcp-data-platform/pkg/platform"
"mycompany/mysemanticprovider"
)
func main() {
semanticProvider, _ := mysemanticprovider.New(
"https://metadata.example.com",
os.Getenv("METADATA_TOKEN"),
)
p, _ := platform.New(
platform.WithServerName("custom-platform"),
platform.WithTrinoToolkit("primary", trinoCfg),
platform.WithCustomSemanticProvider(semanticProvider),
platform.WithEnrichment(platform.EnrichmentConfig{
TrinoSemanticEnrichment: true,
}),
)
p.Run()
}
Custom Middleware¶
Add request processing logic at the MCP protocol level.
Middleware Interface¶
MCP middleware intercepts requests at the protocol level:
// MCP middleware signature from the go-sdk
type Middleware func(next MethodHandler) MethodHandler
type MethodHandler func(ctx context.Context, method string, req mcp.Request) (mcp.Result, error)
Example: Logging Middleware¶
package mymiddleware
import (
"context"
"log"
"time"
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/txn2/mcp-data-platform/pkg/middleware"
)
func LoggingMiddleware(logger *log.Logger) mcp.Middleware {
return func(next mcp.MethodHandler) mcp.MethodHandler {
return func(ctx context.Context, method string, req mcp.Request) (mcp.Result, error) {
// Only log tools/call requests
if method != "tools/call" {
return next(ctx, method, req)
}
start := time.Now()
// Get platform context (set by MCPToolCallMiddleware)
pc := middleware.GetPlatformContext(ctx)
logger.Printf("Starting tool call: %s (user: %s)",
pc.ToolName,
pc.UserID,
)
// Call next handler
result, err := next(ctx, method, req)
// Log completion
duration := time.Since(start)
if err != nil {
logger.Printf("Tool call failed: %s (duration: %v, error: %v)",
pc.ToolName, duration, err)
} else {
logger.Printf("Tool call completed: %s (duration: %v)",
pc.ToolName, duration)
}
return result, err
}
}
}
Example: Rate Limiting Middleware¶
package mymiddleware
import (
"context"
"sync"
"time"
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/txn2/mcp-data-platform/pkg/middleware"
)
type RateLimiter struct {
mu sync.Mutex
requests map[string][]time.Time
limit int
window time.Duration
}
func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
return &RateLimiter{
requests: make(map[string][]time.Time),
limit: limit,
window: window,
}
}
func (r *RateLimiter) Middleware() mcp.Middleware {
return func(next mcp.MethodHandler) mcp.MethodHandler {
return func(ctx context.Context, method string, req mcp.Request) (mcp.Result, error) {
// Only rate limit tools/call
if method != "tools/call" {
return next(ctx, method, req)
}
pc := middleware.GetPlatformContext(ctx)
userID := pc.UserID
if !r.allow(userID) {
return middleware.NewToolResultError("rate limit exceeded"), nil
}
return next(ctx, method, req)
}
}
}
func (r *RateLimiter) allow(userID string) bool {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
cutoff := now.Add(-r.window)
// Clean old requests
var valid []time.Time
for _, t := range r.requests[userID] {
if t.After(cutoff) {
valid = append(valid, t)
}
}
r.requests[userID] = valid
// Check limit
if len(valid) >= r.limit {
return false
}
// Record request
r.requests[userID] = append(r.requests[userID], now)
return true
}
Register Custom Middleware¶
Register middleware on the MCP server using AddReceivingMiddleware():
import (
"log"
"os"
"time"
"github.com/txn2/mcp-data-platform/pkg/platform"
"mycompany/mymiddleware"
)
func main() {
logger := log.New(os.Stdout, "[MCP] ", log.LstdFlags)
rateLimiter := mymiddleware.NewRateLimiter(100, time.Minute)
p, _ := platform.New(
platform.WithServerName("custom-platform"),
platform.WithTrinoToolkit("primary", trinoCfg),
)
// Add custom middleware to the MCP server
p.Server().AddReceivingMiddleware(mymiddleware.LoggingMiddleware(logger))
p.Server().AddReceivingMiddleware(rateLimiter.Middleware())
p.Run()
}
Custom Tool Handlers¶
Override or wrap existing tool handlers.
import (
"context"
"github.com/modelcontextprotocol/go-sdk/mcp"
"github.com/txn2/mcp-data-platform/pkg/platform"
)
func main() {
p, _ := platform.New(
platform.WithServerName("custom-platform"),
platform.WithTrinoToolkit("primary", trinoCfg),
)
// Override a tool handler
p.Server().AddTool(mcp.Tool{
Name: "trino_query",
Description: "Execute SQL query with custom validation",
// ... schema
}, func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Custom validation
if !validateQuery(req) {
return mcp.NewToolResultError("query validation failed"), nil
}
// Delegate to original handler
return p.Toolkits().Get("trino", "primary").HandleQuery(ctx, req)
})
p.Run()
}
Extension Points Summary¶
| Extension | Interface | Use Case |
|---|---|---|
| Custom Toolkit | Toolkit |
Add new tool categories |
| Semantic Provider | semantic.Provider |
Custom metadata source |
| Query Provider | query.Provider |
Custom query engine |
| Storage Provider | storage.Provider |
Custom storage backend |
| Middleware | middleware.Middleware |
Request/response processing |
| Tool Handler | MCP handler func | Override tool behavior |
Best Practices¶
Keep toolkits focused: Each toolkit should have a clear purpose. Don't mix unrelated tools in one toolkit.
Make providers cacheable:
Implement caching at the provider level or wrap with semantic.CachedProvider.
Middleware order matters: Order middleware from outermost (runs first) to innermost. Auth should run before authz.
Handle errors gracefully:
Return tool errors via mcp.NewToolResultError() rather than Go errors for expected failures.
Clean up resources:
Implement Close() on all custom components and ensure the platform calls them.
Next Steps¶
- Providers Reference - Provider interface details
- Middleware Reference - Middleware patterns
- Architecture - System design