Parquet MCP Server
MCP server for interacting with parquet files in a repository. Provides comprehensive data management with audit logging, rollback capabilities, and semantic search.
Credits
This is a custom MCP server implementation for parquet file management with audit trail support.
Features
- Read/Query: Query parquet files with filters, column selection, and limits
- Add Records: Add new records to parquet files with audit trail
- Update Records: Update existing records matching filters with audit trail
- Upsert Records: Insert or update records (supports enhanced filters for duplicate detection)
- Delete Records: Delete records matching filters with audit trail
- Schema Management: Create new data types and evolve existing schemas programmatically
- Audit Log: Complete change history with old/new values for all modifications
- Rollback: Undo specific operations using audit IDs
- Schema Discovery: Get schema definitions for data types
- Statistics: Get basic statistics about parquet files
- Efficient Backups: Audit log entries (~1 KB) instead of full snapshots (99%+ storage reduction)
- Optional Full Snapshots: Configurable periodic snapshots for additional safety
Installation
cd truth/mcp-servers/parquet
pip install -r requirements.txt
Configuration
Data Directory
The server uses the following priority to locate the data directory:
-
Environment Variable (highest priority):
export DATA_DIR="/path/to/your/data/directory" -
Auto-detection (for backward compatibility):
- Automatically detects parent repository structure
- Looks for
$DATA_DIR/directory in common locations
-
Default (if no structure found):
- Uses
~/.config/parquet-mcp/data/as fallback
- Uses
Note: The MCP server is portable and can be used with any data directory structure.
Cursor Configuration
Add to your Cursor MCP settings (typically ~/.cursor/mcp.json or Cursor settings):
Development (Audit Log Only):
{
"mcpServers": {
"parquet": {
"command": "python3",
"args": [
"/path/to/parquet_mcp_server.py"
],
"env": {
"DATA_DIR": "/path/to/your/data/directory"
}
}
}
}
Production (With Periodic Snapshots):
{
"mcpServers": {
"parquet": {
"command": "python3",
"args": [
"/path/to/parquet_mcp_server.py"
],
"env": {
"DATA_DIR": "/path/to/your/data/directory",
"MCP_FULL_SNAPSHOTS": "true",
"MCP_SNAPSHOT_FREQUENCY": "weekly"
}
}
}
}
Note: Replace /path/to/parquet_mcp_server.py with the actual path to the server file.
Claude Desktop Configuration
Add to claude_desktop_config.json (typically ~/Library/Application Support/Claude/claude_desktop_config.json on macOS):
{
"mcpServers": {
"parquet": {
"command": "python3",
"args": [
"/path/to/parquet_mcp_server.py"
],
"env": {
"DATA_DIR": "/path/to/your/data/directory"
}
}
}
}
Note: Replace /path/to/parquet_mcp_server.py with the actual path to the server file.
Available Tools
Tool Selection Guide
For counting records: Use get_statistics - returns row_count efficiently without loading data.
For retrieving data: Use read_parquet - returns actual records with filtering, sorting, and column selection.
- Do not call
get_schemaunless you need schema information (field names, types, constraints) to complete the task. Data queries viaread_parquetreturn all necessary field information.
For metadata: Use get_schema only when you need to understand the schema structure (required fields, valid values, data types) to build queries, filters, or add/update records. Do not call it for routine data retrieval tasks.
For modifications: Use add_record, update_records, upsert_record, or delete_records with audit logging.
Proactive Memory Storage
Source of Truth: Instructions for proactive memory storage are embedded in the MCP server:
- Tool descriptions -
add_record,update_records, andupsert_recorddescriptions include requirements - Prompt -
data_persistence_guidanceprompt (vialist_prompts/get_prompt) provides detailed guidance
Agents see these instructions when listing/using tools. This README is for human reference only.
list_data_types
List all available data types (parquet files) in the data directory.
get_schema
Get the schema definition for a data type.
Parameters:
data_type(required): The data type name (e.g., 'flows', 'transactions', 'tasks')
create_data_type
Create a new data type with schema definition. Creates the schema JSON file, directory structure, and initializes an empty parquet file. Use when introducing a new data type that doesn't exist yet.
Parameters:
data_type(required): The data type name (e.g., 'relationships', 'custom_events')schema(required): Schema definition object withfield_name: type_namepairs. Supported types:'string','integer','int64','float64','boolean','date','datetime','timestamp'description(required): Description of the data type and its purposeversion(optional): Schema version (e.g., '1.0.0', default: '1.0.0')
Example:
{
"data_type": "relationships",
"schema": {
"relationship_id": "string",
"source_contact_id": "string",
"target_contact_id": "string",
"relationship_type": "string",
"start_date": "date",
"end_date": "date",
"notes": "string",
"created_date": "date",
"updated_date": "date"
},
"description": "Structured relationships between contacts",
"version": "1.0.0"
}
update_schema
Update an existing data type schema. Supports adding new fields, updating description, and version bumping. Automatically adds missing columns to existing parquet file. Schema evolution only - does not support removing fields or changing field types (breaking changes require manual migration).
Parameters:
data_type(required): The data type name (e.g., 'contacts', 'tasks')add_fields(optional): New fields to add:{field_name: type_name}. Types:'string','integer','int64','float64','boolean','date','datetime','timestamp'update_description(optional): Update the schema descriptionversion(optional): New schema version (e.g., '1.1.0'). If not provided, minor version is auto-incremented for field additionsupdate_parquet(optional): Whether to update the parquet file to add missing columns (default:true)
Example:
{
"data_type": "contacts",
"add_fields": {
"middle_name": "string",
"suffix": "string"
},
"update_description": "Contact information with enhanced name fields",
"version": "1.1.0"
}
Limitations:
- Cannot remove fields (breaking change - requires manual migration)
- Cannot change field types (breaking change - requires manual migration)
- Only supports additive schema evolution
read_parquet
Read and query a parquet file with optional filters. Supports enhanced filtering operators.
Use when: You need to retrieve actual data records (rows) from the file.
Do not use for: Counting records - use get_statistics instead, which returns row count efficiently.
Parameters:
data_type(required): The data type namefilters(optional): Key-value pairs to filter records. Supports enhanced operators:- Simple value: exact match
- List: in list (
["value1", "value2"]) {"$contains": "text"}: substring match (case-insensitive){"$starts_with": "text"}: prefix match (case-insensitive){"$ends_with": "text"}: suffix match (case-insensitive){"$regex": "pattern"}: regex pattern match{"$fuzzy": {"text": "query", "threshold": 0.7}}: fuzzy string matching (0-1 similarity){"$gt": 100},{"$gte": 100},{"$lt": 100},{"$lte": 100}: numeric comparisons{"$ne": "value"}: not equal
limit(optional): Maximum number of rows to return (default: 1000)columns(optional): List of column names to return (default: all columns)sort_by(optional): List of sort specifications, each with:column(required): Column name to sort byascending(optional): Sort direction,truefor ascending (default),falsefor descendingna_position(optional): Where to place null/NaN values:"last"(default) or"first"custom_order(optional): Custom order for enum values (e.g.,["critical", "high", "medium", "low"])
Examples:
{
"data_type": "flows",
"filters": {
"category": "property_maintenance",
"year": 2025
},
"limit": 100
}
{
"data_type": "tasks",
"filters": {
"title": {"$contains": "therapy"},
"status": {"$ne": "completed"}
}
}
{
"data_type": "tasks",
"filters": {
"title": {"$fuzzy": {"text": "therapy session", "threshold": 0.7}}
}
}
Example with sorting:
{
"data_type": "tasks",
"filters": {
"sync_log": "exported"
},
"sort_by": [
{
"column": "sync_datetime",
"ascending": false,
"na_position": "last"
}
],
"limit": 10
}
add_record
Add a new record to a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namerecord(required): The record data as a JSON object matching the schema
Example:
{
"data_type": "flows",
"record": {
"flow_name": "Monthly Rent",
"flow_date": "2025-01-15",
"amount_usd": 1500.00,
"category": "housing",
"flow_type": "recurring_expense"
}
}
update_records
Update existing records in a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to updateupdates(required): Fields to update
Example:
{
"data_type": "tasks",
"filters": {
"task_id": "abc123"
},
"updates": {
"status": "completed",
"completed_date": "2025-01-15"
}
}
upsert_record
Insert or update a record (upsert). Checks for existing records using enhanced filters (supports all read_parquet filter operators including $contains, $fuzzy, etc.). If found, updates matching records. If not found, creates a new record. Returns whether it created or updated. Useful for preventing duplicates when adding contacts, tasks, or other records.
Parameters:
data_type(required): The data type namefilters(required): Enhanced filters to identify existing records (supports allread_parquetfilter operators)record(required): The record data to insert or update
Returns:
action: "created" or "updated"audit_idoraudit_ids: Audit log entry ID(s)record_id: The ID of the created/updated record
Example (exact match):
{
"data_type": "contacts",
"filters": {
"email": "galina@secod.com"
},
"record": {
"name": "Galina Semakova",
"email": "galina@secod.com",
"category": "legal",
"last_contact_date": "2025-12-24"
}
}
Example (fuzzy match):
{
"data_type": "contacts",
"filters": {
"name": {"$fuzzy": {"text": "Galina Semakova", "threshold": 0.8}}
},
"record": {
"name": "Galina Semakova",
"email": "galina@secod.com",
"category": "legal",
"last_contact_date": "2025-12-24"
}
}
Example (contains match):
{
"data_type": "tasks",
"filters": {
"title": {"$contains": "therapy payment"}
},
"record": {
"title": "Pay for therapy session",
"status": "pending",
"due_date": "2025-12-25"
}
}
delete_records
Delete records from a parquet file. Creates audit log entry and optional snapshot.
Parameters:
data_type(required): The data type namefilters(required): Filters to identify records to delete
Example:
{
"data_type": "tasks",
"filters": {
"status": "canceled"
}
}
get_statistics
Get comprehensive statistics about a parquet file including row count, column information, date ranges, and categorical distributions.
Use when: You need to know how many records exist, get file metadata, or analyze data distributions. This is the efficient way to answer "how many" questions.
Returns:
row_count: Total number of records (use this for count queries)column_count: Number of columnsdate_statistics: Min/max dates and ranges for date columnscategorical_statistics: Value distributions for categorical columns- Column types and memory usage
Parameters:
data_type(required): The data type name
read_audit_log
Read audit log entries with optional filters. View complete history of all data modifications.
Parameters:
data_type(optional): Filter by data typeoperation(optional): Filter by operation (add, update, delete)record_id(optional): Filter by specific record IDlimit(optional): Maximum number of entries to return (default: 100)
Example:
{
"data_type": "transactions",
"operation": "update",
"limit": 50
}
rollback_operation
Rollback a specific operation using its audit ID. Creates inverse operation to undo changes.
Parameters:
audit_id(required): The audit ID of the operation to rollback
Rollback Logic:
addoperation → Delete the recordupdateoperation → Restore old valuesdeleteoperation → Restore the record
Example:
{
"audit_id": "abc123def456"
}
search_parquet
Semantic search using embeddings. Searches text fields for semantically similar records.
Parameters:
data_type(required): The data type namequery(required): Search query texttext_fields(optional): List of text fields to search (default: auto-detect)limit(optional): Maximum number of results (default: 10)min_similarity(optional): Minimum cosine similarity threshold 0-1 (default: 0.7)additional_filters(optional): Additional filters to apply (same format as read_parquet)
Prerequisites:
- Must run
generate_embeddingsfirst to create embeddings for the data type - Requires
OPENAI_API_KEYenvironment variable
Example:
{
"data_type": "tasks",
"query": "pay for therapy session",
"limit": 5,
"min_similarity": 0.7
}
generate_embeddings
Generate and store embeddings for text fields in a data type. Creates embeddings parquet file for semantic search.
Parameters:
data_type(required): The data type nametext_fields(optional): List of text fields to generate embeddings for (default: auto-detect)force_regenerate(optional): Force regeneration of all embeddings (default: false)
Prerequisites:
- Requires
OPENAI_API_KEYenvironment variable
Example:
{
"data_type": "tasks",
"text_fields": ["title", "description", "notes"]
}
Note: Embeddings are cached. Only missing embeddings are generated unless force_regenerate is true.
Backup & Recovery
Audit Log (Default)
All write operations create lightweight audit log entries in data/logs/audit_log.parquet:
- Storage: ~1 KB per operation (99%+ reduction vs full snapshots)
- Content: Operation type, record ID, affected fields, old/new values, timestamp
- Recovery: Rollback specific operations using
rollback_operationtool
Automatic Snapshots
All write operations automatically create timestamped snapshots before modification (per policy requirement). This ensures rollback capability for every data change.
Snapshot Location:
data/snapshots/[data_type]-[YYYY-MM-DD-HHMMSS].parquet
Note: The MCP_FULL_SNAPSHOTS and MCP_SNAPSHOT_FREQUENCY environment variables are no longer used - snapshots are always created for all write operations.
Storage Comparison
| Approach | Storage per Operation | 100 Operations |
|---|---|---|
| Full snapshots (old) | 10 MB | 1 GB |
| Audit log (new) | ~1 KB | ~100 KB |
| Savings | 99.99% | 99.99% |
Recovery Options
- Recent Changes: Use
rollback_operationwith audit ID - Multiple Changes: Rollback operations in reverse chronological order
- Full Restore: Restore from periodic snapshot (if enabled)
- Point-in-Time: Restore snapshot + replay audit log to specific timestamp
See AUDIT_LOG_GUIDE.md for detailed documentation.
Data Types
The server automatically discovers data types by scanning data/ for directories containing [type].parquet files. Common data types include:
flows- Cash flow and expense datatransactions- Transaction datatasks- Task management datacontacts- Contact/merchant informationincome- Income datafixed_costs- Fixed cost data- And many more...
Error Handling
The server returns structured error messages in JSON format when operations fail. Common errors include:
- File not found errors
- Schema validation errors
- Column not found errors
- Filter matching errors
Security Notes
- All write operations create audit log entries for traceability
- Audit logs are stored in
data/logs/audit_log.parquet - Optional full snapshots can be configured for additional safety
- Never commit sensitive data files to version control
Troubleshooting
-
File Not Found Errors
- Verify the data type exists in
data/[type]/[type].parquet - Check file permissions
- Verify the data type exists in
-
Schema Validation Errors
- Ensure records match the schema defined in
data/schemas/[type]_schema.json - Check required fields are present
- Ensure records match the schema defined in
-
Filter Matching Errors
- Verify filter syntax matches supported operators
- Check column names exist in the schema
Testing
After installation/updates, run the test script:
python3 mcp-servers/parquet/test_audit_log.py
This validates:
- Audit log creation
- Schema compliance
- Operation tracking
See IMPLEMENTATION_SUMMARY.md for manual testing procedures.
Documentation
- README.md - This file, overview and quick reference
- AUDIT_LOG_GUIDE.md - Complete audit log documentation
- IMPLEMENTATION_SUMMARY.md - Implementation details and testing
- SETUP.md - Setup and configuration instructions
Notes
- The server uses audit log for efficient change tracking (99%+ storage reduction)
- All date fields are automatically converted to ISO format strings in responses
- Null/NaN values are converted to
nullin JSON responses - The server runs in stdio mode for MCP communication
- Audit log entries are never automatically deleted (manual archival if needed)
License
MIT