Kafka MCP Server
An MCP server implementation for Kafka, allowing LLMs to interact with and manage Kafka clusters.
Features
- Cluster Management: View broker details
describe_cluster,describe_brokers. - Topic Management: List
list_topics, createcreate_topic, deletedelete_topic, describedescribe_topic, and increase partitionscreate_partitions. - Configuration Management: View
describe_configsand modifyalter_configsdynamic configs for topics, brokers, and groups. - Consumer Groups: List
list_consumer_groups, describedescribe_consumer_group, and securely manage offsets withreset_consumer_group_offsetandrewind_consumer_group_offset_by_timestamp. Advanced tools include state validation, dry runs, and execution audit logging. - Messaging: Consume messages
consume_messages(from beginning, latest, or specific offsets) and produce messagesproduce_message.
Prerequisites
- Python 3.10+
uvpackage manager (recommended)- A running Kafka cluster (e.g., local Docker, Confluent Cloud, etc.)
Installation
- Clone the repository.
- Install dependencies:
uv sync
Configuration
The server requires the KAFKA_BOOTSTRAP_SERVERS environment variable.
KAFKA_BOOTSTRAP_SERVERS: Comma-separated list of broker urls (e.g.,localhost:9092).KAFKA_CLIENT_ID: (Optional) Client ID for connection (default:kafka-mcp).
Usage
Running the Server
You can run the server directly using uv or python, or use Docker.
Using uv (Recommended)
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
uv run kafka-mcp
Using Docker
-
Build the Docker image:
docker build -t kafka-mcp . -
Run the container:
docker run -i --rm -e KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9092 kafka-mcp(Note: Use
host.docker.internalinstead oflocalhostif your Kafka cluster is running on the host machine.)
Claude Desktop Configuration
Add the following to your Claude Desktop configuration file (claude_desktop_config.json):
{
"mcpServers": {
"kafka": {
"command": "<uv PATH>",
"args": [
"--directory",
"<kafka-mcp PATH>",
"run",
"kafka-mcp"
],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"
}
}
}
}
Debugging / Development
To verify that the server can start and connect to your Kafka cluster (ensure your Kafka is running first):
# Set your bootstrap server
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# Run a quick check
uv run python -c "from src.kafka_mcp import main; print('Imports successful')"
Available Tools
| Category | Tool Name | Description |
|---|---|---|
| Cluster | describe_cluster | Get cluster metadata (controller, brokers). |
describe_brokers | List all brokers. | |
| Topics | list_topics | List all available topics. |
describe_topic | Get detailed info (partitions, replicas) for a topic. | |
create_topic | Create a new topic with partitions/replication factor. | |
delete_topic | Delete a topic. | |
create_partitions | Increase partitions for a topic. | |
| Configs | describe_configs | View dynamic configs for topic/broker/group. |
alter_configs | Update dynamic configs. | |
| Consumers | list_consumer_groups | List all active consumer groups. |
describe_consumer_group | Get members and state of a group. | |
get_consumer_group_offsets | Get committed offset, high/low watermarks, and calculate total lag for a topic. | |
reset_consumer_group_offset | Safely change consumer group offsets to earliest, latest, or a specific offset. | |
rewind_consumer_group_offset_by_timestamp | Rewind/advance consumer group offsets securely based on a timestamp. | |
| Messages | consume_messages | Consume messages from a topic (supports offsets, limits). |
produce_message | Send a message to a topic. |
Project Structure
src/kafka_mcp/
├── configs/ # Configuration handling
├── connections/ # Kafka client factories (singleton)
├── tools/ # Tool implementations
│ ├── admin.py # Topic & Config management
│ ├── cluster.py # Cluster metadata
│ ├── consumer.py # Consumer group & message consumption
│ └── producer.py # Message production
└── main.py # Entry point & MCP tool registration
Troubleshooting
- Connection Refused: Ensure
KAFKA_BOOTSTRAP_SERVERSis correct and reachable.
TODO
- SASL
- JMX