From 8977b30e636f181d97b4857e8edea86eab4853f6 Mon Sep 17 00:00:00 2001 From: Eric Garcia Date: Sat, 24 Jan 2026 03:37:40 -0500 Subject: [PATCH] feat: Phase 5 - staging lock tools for multi-agent coordination Add staging resource locking for coordinating parallel agent work: - store.rs: Added staging_locks and staging_lock_queue tables - handlers/staging.rs: Lock acquire/release with queuing, status, cleanup - Automatic lock expiration and queue cleanup New MCP tools (4): - blue_staging_lock - Acquire exclusive access to staging resources - blue_staging_unlock - Release a staging lock - blue_staging_status - Check lock status for specific resource or all - blue_staging_cleanup - Clean up expired locks and orphaned queue entries Total: 32 MCP tools, 24 tests passing Co-Authored-By: Claude Opus 4.5 --- crates/blue-core/src/lib.rs | 2 +- crates/blue-core/src/store.rs | 264 ++++++++++++++++++ crates/blue-mcp/src/handlers/mod.rs | 1 + crates/blue-mcp/src/handlers/staging.rs | 262 +++++++++++++++++ crates/blue-mcp/src/server.rs | 115 ++++++++ .../rfcs/0002-port-coherence-functionality.md | 18 +- 6 files changed, 657 insertions(+), 5 deletions(-) create mode 100644 crates/blue-mcp/src/handlers/staging.rs diff --git a/crates/blue-core/src/lib.rs b/crates/blue-core/src/lib.rs index 6f1b88c..1b68af8 100644 --- a/crates/blue-core/src/lib.rs +++ b/crates/blue-core/src/lib.rs @@ -22,6 +22,6 @@ pub mod workflow; pub use documents::*; pub use repo::{detect_blue, BlueHome, RepoError, WorktreeInfo}; pub use state::{ItemType, ProjectState, StateError, StatusSummary, WorkItem}; -pub use store::{DocType, Document, DocumentStore, LinkType, Reminder, ReminderStatus, SearchResult, Session, SessionType, StoreError, Task as StoreTask, TaskProgress, Worktree}; +pub use store::{DocType, Document, DocumentStore, LinkType, Reminder, ReminderStatus, SearchResult, Session, SessionType, StagingLock, StagingLockQueueEntry, StagingLockResult, StoreError, Task as StoreTask, TaskProgress, Worktree}; pub use voice::*; pub use workflow::{PrdStatus, RfcStatus, SpikeOutcome as WorkflowSpikeOutcome, SpikeStatus, WorkflowError}; diff --git a/crates/blue-core/src/store.rs b/crates/blue-core/src/store.rs index 7a9e7eb..60915ec 100644 --- a/crates/blue-core/src/store.rs +++ b/crates/blue-core/src/store.rs @@ -103,6 +103,27 @@ const SCHEMA: &str = r#" CREATE INDEX IF NOT EXISTS idx_reminders_status ON reminders(status); CREATE INDEX IF NOT EXISTS idx_reminders_due ON reminders(due_date) WHERE status = 'pending'; + + CREATE TABLE IF NOT EXISTS staging_locks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + resource TEXT NOT NULL UNIQUE, + locked_by TEXT NOT NULL, + agent_id TEXT, + locked_at TEXT NOT NULL, + expires_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS staging_lock_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + resource TEXT NOT NULL, + requester TEXT NOT NULL, + agent_id TEXT, + requested_at TEXT NOT NULL, + FOREIGN KEY (resource) REFERENCES staging_locks(resource) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_staging_locks_resource ON staging_locks(resource); + CREATE INDEX IF NOT EXISTS idx_staging_queue_resource ON staging_lock_queue(resource); "#; /// FTS5 schema for full-text search @@ -366,6 +387,40 @@ impl Reminder { } } +/// A staging resource lock +#[derive(Debug, Clone)] +pub struct StagingLock { + pub id: Option, + pub resource: String, + pub locked_by: String, + pub agent_id: Option, + pub locked_at: String, + pub expires_at: String, +} + +/// A queued request for a staging lock +#[derive(Debug, Clone)] +pub struct StagingLockQueueEntry { + pub id: Option, + pub resource: String, + pub requester: String, + pub agent_id: Option, + pub requested_at: String, +} + +/// Result of attempting to acquire a staging lock +#[derive(Debug)] +pub enum StagingLockResult { + /// Lock was acquired + Acquired { expires_at: String }, + /// Lock is held by someone else, added to queue + Queued { + position: usize, + current_holder: String, + expires_at: String, + }, +} + /// Store errors - in Blue's voice #[derive(Debug, thiserror::Error)] pub enum StoreError { @@ -1368,6 +1423,215 @@ impl DocumentStore { Ok(()) }) } + + // ==================== Staging Lock Operations ==================== + + /// Acquire a staging lock or join queue + pub fn acquire_staging_lock( + &self, + resource: &str, + locked_by: &str, + agent_id: Option<&str>, + duration_minutes: i64, + ) -> Result { + self.with_retry(|| { + let now = chrono::Utc::now(); + let now_str = now.to_rfc3339(); + let expires_at = now + chrono::Duration::minutes(duration_minutes); + let expires_str = expires_at.to_rfc3339(); + + // First, clean up expired locks + self.conn.execute( + "DELETE FROM staging_locks WHERE expires_at < ?1", + params![now_str], + )?; + + // Check if lock exists + let existing: Option<(String, String)> = self.conn + .query_row( + "SELECT locked_by, expires_at FROM staging_locks WHERE resource = ?1", + params![resource], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .optional()?; + + match existing { + Some((holder, holder_expires)) => { + // Lock exists, add to queue + self.conn.execute( + "INSERT INTO staging_lock_queue (resource, requester, agent_id, requested_at) + VALUES (?1, ?2, ?3, ?4)", + params![resource, locked_by, agent_id, now_str], + )?; + + // Get queue position + let position: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM staging_lock_queue WHERE resource = ?1", + params![resource], + |row| row.get(0), + )?; + + Ok(StagingLockResult::Queued { + position: position as usize, + current_holder: holder, + expires_at: holder_expires, + }) + } + None => { + // No lock, acquire it + self.conn.execute( + "INSERT INTO staging_locks (resource, locked_by, agent_id, locked_at, expires_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![resource, locked_by, agent_id, now_str, expires_str], + )?; + + Ok(StagingLockResult::Acquired { + expires_at: expires_str, + }) + } + } + }) + } + + /// Release a staging lock + pub fn release_staging_lock(&self, resource: &str, locked_by: &str) -> Result, StoreError> { + self.with_retry(|| { + // Verify the lock is held by the requester + let holder: Option = self.conn + .query_row( + "SELECT locked_by FROM staging_locks WHERE resource = ?1", + params![resource], + |row| row.get(0), + ) + .optional()?; + + match holder { + Some(h) if h == locked_by => { + // Get next in queue BEFORE deleting lock (CASCADE would remove queue entries) + let next: Option = self.conn + .query_row( + "SELECT requester FROM staging_lock_queue WHERE resource = ?1 ORDER BY requested_at ASC LIMIT 1", + params![resource], + |row| row.get(0), + ) + .optional()?; + + // Release the lock (CASCADE will clean up queue) + self.conn.execute( + "DELETE FROM staging_locks WHERE resource = ?1", + params![resource], + )?; + + Ok(next) + } + Some(_) => Err(StoreError::InvalidOperation(format!( + "Lock for '{}' is not held by '{}'", + resource, locked_by + ))), + None => Err(StoreError::NotFound(format!("lock for '{}'", resource))), + } + }) + } + + /// Get current staging lock for a resource + pub fn get_staging_lock(&self, resource: &str) -> Result, StoreError> { + // First clean up expired locks + let now = chrono::Utc::now().to_rfc3339(); + self.conn.execute( + "DELETE FROM staging_locks WHERE expires_at < ?1", + params![now], + )?; + + self.conn + .query_row( + "SELECT id, resource, locked_by, agent_id, locked_at, expires_at + FROM staging_locks WHERE resource = ?1", + params![resource], + |row| { + Ok(StagingLock { + id: Some(row.get(0)?), + resource: row.get(1)?, + locked_by: row.get(2)?, + agent_id: row.get(3)?, + locked_at: row.get(4)?, + expires_at: row.get(5)?, + }) + }, + ) + .optional() + .map_err(StoreError::Database) + } + + /// Get queue for a staging lock + pub fn get_staging_lock_queue(&self, resource: &str) -> Result, StoreError> { + let mut stmt = self.conn.prepare( + "SELECT id, resource, requester, agent_id, requested_at + FROM staging_lock_queue WHERE resource = ?1 ORDER BY requested_at ASC", + )?; + + let rows = stmt.query_map(params![resource], |row| { + Ok(StagingLockQueueEntry { + id: Some(row.get(0)?), + resource: row.get(1)?, + requester: row.get(2)?, + agent_id: row.get(3)?, + requested_at: row.get(4)?, + }) + })?; + + rows.collect::, _>>() + .map_err(StoreError::Database) + } + + /// List all active staging locks + pub fn list_staging_locks(&self) -> Result, StoreError> { + // First clean up expired locks + let now = chrono::Utc::now().to_rfc3339(); + self.conn.execute( + "DELETE FROM staging_locks WHERE expires_at < ?1", + params![now], + )?; + + let mut stmt = self.conn.prepare( + "SELECT id, resource, locked_by, agent_id, locked_at, expires_at + FROM staging_locks ORDER BY locked_at DESC", + )?; + + let rows = stmt.query_map([], |row| { + Ok(StagingLock { + id: Some(row.get(0)?), + resource: row.get(1)?, + locked_by: row.get(2)?, + agent_id: row.get(3)?, + locked_at: row.get(4)?, + expires_at: row.get(5)?, + }) + })?; + + rows.collect::, _>>() + .map_err(StoreError::Database) + } + + /// Clean up expired staging locks and orphaned queue entries + pub fn cleanup_expired_staging(&self) -> Result<(usize, usize), StoreError> { + self.with_retry(|| { + let now = chrono::Utc::now().to_rfc3339(); + + // Clean expired locks + let locks_cleaned = self.conn.execute( + "DELETE FROM staging_locks WHERE expires_at < ?1", + params![now], + )?; + + // Clean orphaned queue entries (for resources with no lock) + let queue_cleaned = self.conn.execute( + "DELETE FROM staging_lock_queue WHERE resource NOT IN (SELECT resource FROM staging_locks)", + [], + )?; + + Ok((locks_cleaned, queue_cleaned)) + }) + } } #[cfg(test)] diff --git a/crates/blue-mcp/src/handlers/mod.rs b/crates/blue-mcp/src/handlers/mod.rs index 29f839d..57dd262 100644 --- a/crates/blue-mcp/src/handlers/mod.rs +++ b/crates/blue-mcp/src/handlers/mod.rs @@ -9,4 +9,5 @@ pub mod release; pub mod reminder; pub mod session; pub mod spike; +pub mod staging; pub mod worktree; diff --git a/crates/blue-mcp/src/handlers/staging.rs b/crates/blue-mcp/src/handlers/staging.rs new file mode 100644 index 0000000..c87c851 --- /dev/null +++ b/crates/blue-mcp/src/handlers/staging.rs @@ -0,0 +1,262 @@ +//! Staging lock tool handlers +//! +//! Handles staging environment isolation through resource locking. +//! Ensures single-writer access to staging resources like migrations. + +use blue_core::{ProjectState, StagingLockResult}; +use serde_json::{json, Value}; + +use crate::error::ServerError; + +/// Handle blue_staging_lock +pub fn handle_lock(state: &ProjectState, args: &Value) -> Result { + let resource = args + .get("resource") + .and_then(|v| v.as_str()) + .ok_or(ServerError::InvalidParams)?; + + let locked_by = args + .get("locked_by") + .and_then(|v| v.as_str()) + .ok_or(ServerError::InvalidParams)?; + + let agent_id = args.get("agent_id").and_then(|v| v.as_str()); + let duration_minutes = args + .get("duration_minutes") + .and_then(|v| v.as_i64()) + .unwrap_or(30); + + let result = state + .store + .acquire_staging_lock(resource, locked_by, agent_id, duration_minutes) + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + match result { + StagingLockResult::Acquired { expires_at } => Ok(json!({ + "status": "acquired", + "message": blue_core::voice::success( + &format!("Acquired staging lock for '{}'", resource), + Some(&format!("Expires at {}. Release with blue_staging_unlock when done.", expires_at)) + ), + "resource": resource, + "locked_by": locked_by, + "expires_at": expires_at + })), + StagingLockResult::Queued { + position, + current_holder, + expires_at, + } => Ok(json!({ + "status": "queued", + "message": blue_core::voice::info( + &format!("Resource '{}' is locked by '{}'", resource, current_holder), + Some(&format!("You're #{} in queue. Lock expires at {}.", position, expires_at)) + ), + "resource": resource, + "queue_position": position, + "current_holder": current_holder, + "holder_expires_at": expires_at + })), + } +} + +/// Handle blue_staging_unlock +pub fn handle_unlock(state: &ProjectState, args: &Value) -> Result { + let resource = args + .get("resource") + .and_then(|v| v.as_str()) + .ok_or(ServerError::InvalidParams)?; + + let locked_by = args + .get("locked_by") + .and_then(|v| v.as_str()) + .ok_or(ServerError::InvalidParams)?; + + let next_in_queue = state + .store + .release_staging_lock(resource, locked_by) + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + let hint = match &next_in_queue { + Some(next) => format!("Next in queue: '{}' can now acquire the lock.", next), + None => "No one waiting in queue.".to_string(), + }; + + Ok(json!({ + "status": "released", + "message": blue_core::voice::success( + &format!("Released staging lock for '{}'", resource), + Some(&hint) + ), + "resource": resource, + "next_in_queue": next_in_queue + })) +} + +/// Handle blue_staging_status +pub fn handle_status(state: &ProjectState, args: &Value) -> Result { + let resource = args.get("resource").and_then(|v| v.as_str()); + + if let Some(resource) = resource { + // Get status for specific resource + let lock = state + .store + .get_staging_lock(resource) + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + let queue = state + .store + .get_staging_lock_queue(resource) + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + if let Some(lock) = lock { + Ok(json!({ + "status": "locked", + "message": blue_core::voice::info( + &format!("Resource '{}' is locked by '{}'", resource, lock.locked_by), + Some(&format!("Expires at {}. {} waiting in queue.", lock.expires_at, queue.len())) + ), + "resource": resource, + "lock": { + "locked_by": lock.locked_by, + "agent_id": lock.agent_id, + "locked_at": lock.locked_at, + "expires_at": lock.expires_at + }, + "queue": queue.iter().map(|q| json!({ + "requester": q.requester, + "agent_id": q.agent_id, + "requested_at": q.requested_at + })).collect::>() + })) + } else { + Ok(json!({ + "status": "available", + "message": blue_core::voice::info( + &format!("Resource '{}' is available", resource), + None::<&str> + ), + "resource": resource, + "lock": null, + "queue": [] + })) + } + } else { + // List all locks + let locks = state + .store + .list_staging_locks() + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + let hint = if locks.is_empty() { + "No active staging locks. Resources are available." + } else { + "Use blue_staging_unlock to release locks when done." + }; + + Ok(json!({ + "status": "success", + "message": blue_core::voice::info( + &format!("{} active staging lock{}", locks.len(), if locks.len() == 1 { "" } else { "s" }), + Some(hint) + ), + "locks": locks.iter().map(|l| json!({ + "resource": l.resource, + "locked_by": l.locked_by, + "agent_id": l.agent_id, + "locked_at": l.locked_at, + "expires_at": l.expires_at + })).collect::>() + })) + } +} + +/// Handle blue_staging_cleanup +pub fn handle_cleanup(state: &ProjectState, _args: &Value) -> Result { + let (locks_cleaned, queue_cleaned) = state + .store + .cleanup_expired_staging() + .map_err(|e| ServerError::CommandFailed(e.to_string()))?; + + let total = locks_cleaned + queue_cleaned; + + let hint = if total == 0 { + "No expired staging resources found. All clean." + } else { + "Cleaned up expired resources." + }; + + Ok(json!({ + "status": "success", + "message": blue_core::voice::success( + &format!("Cleaned {} expired lock{}, {} orphaned queue entr{}", + locks_cleaned, + if locks_cleaned == 1 { "" } else { "s" }, + queue_cleaned, + if queue_cleaned == 1 { "y" } else { "ies" } + ), + Some(hint) + ), + "locks_cleaned": locks_cleaned, + "queue_entries_cleaned": queue_cleaned, + "total_cleaned": total + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lock_requires_resource() { + let state = ProjectState::for_test(); + let args = json!({ + "locked_by": "test-agent" + }); + + let result = handle_lock(&state, &args); + assert!(result.is_err()); + } + + #[test] + fn test_lock_acquire_and_release() { + let state = ProjectState::for_test(); + + // Acquire lock + let args = json!({ + "resource": "migration", + "locked_by": "agent-1", + "duration_minutes": 5 + }); + let result = handle_lock(&state, &args).unwrap(); + assert_eq!(result["status"], "acquired"); + + // Try to acquire again - should queue + let args2 = json!({ + "resource": "migration", + "locked_by": "agent-2" + }); + let result2 = handle_lock(&state, &args2).unwrap(); + assert_eq!(result2["status"], "queued"); + assert_eq!(result2["queue_position"], 1); + + // Release + let release_args = json!({ + "resource": "migration", + "locked_by": "agent-1" + }); + let release_result = handle_unlock(&state, &release_args).unwrap(); + assert_eq!(release_result["status"], "released"); + assert_eq!(release_result["next_in_queue"], "agent-2"); + } + + #[test] + fn test_status_no_locks() { + let state = ProjectState::for_test(); + let args = json!({}); + + let result = handle_status(&state, &args).unwrap(); + assert_eq!(result["status"], "success"); + assert_eq!(result["locks"].as_array().unwrap().len(), 0); + } +} diff --git a/crates/blue-mcp/src/server.rs b/crates/blue-mcp/src/server.rs index 74d3bc3..8a9fa22 100644 --- a/crates/blue-mcp/src/server.rs +++ b/crates/blue-mcp/src/server.rs @@ -749,6 +749,88 @@ impl BlueServer { } } } + }, + { + "name": "blue_staging_lock", + "description": "Acquire exclusive access to a staging resource.", + "inputSchema": { + "type": "object", + "properties": { + "cwd": { + "type": "string", + "description": "Current working directory" + }, + "resource": { + "type": "string", + "description": "Resource to lock (e.g., 'migration', 'staging-db')" + }, + "locked_by": { + "type": "string", + "description": "Identifier for lock holder (RFC title or PR number)" + }, + "agent_id": { + "type": "string", + "description": "Blue agent ID (from .env.isolated)" + }, + "duration_minutes": { + "type": "number", + "description": "Lock duration in minutes (default 30)" + } + }, + "required": ["resource", "locked_by"] + } + }, + { + "name": "blue_staging_unlock", + "description": "Release a staging lock.", + "inputSchema": { + "type": "object", + "properties": { + "cwd": { + "type": "string", + "description": "Current working directory" + }, + "resource": { + "type": "string", + "description": "Resource to unlock" + }, + "locked_by": { + "type": "string", + "description": "Identifier that acquired the lock" + } + }, + "required": ["resource", "locked_by"] + } + }, + { + "name": "blue_staging_status", + "description": "Check staging lock status.", + "inputSchema": { + "type": "object", + "properties": { + "cwd": { + "type": "string", + "description": "Current working directory" + }, + "resource": { + "type": "string", + "description": "Specific resource to check (omit for all locks)" + } + } + } + }, + { + "name": "blue_staging_cleanup", + "description": "Clean up expired staging resources.", + "inputSchema": { + "type": "object", + "properties": { + "cwd": { + "type": "string", + "description": "Current working directory" + } + } + } } ] })) @@ -800,6 +882,11 @@ impl BlueServer { "blue_reminder_list" => self.handle_reminder_list(&call.arguments), "blue_reminder_snooze" => self.handle_reminder_snooze(&call.arguments), "blue_reminder_clear" => self.handle_reminder_clear(&call.arguments), + // Phase 5: Staging handlers + "blue_staging_lock" => self.handle_staging_lock(&call.arguments), + "blue_staging_unlock" => self.handle_staging_unlock(&call.arguments), + "blue_staging_status" => self.handle_staging_status(&call.arguments), + "blue_staging_cleanup" => self.handle_staging_cleanup(&call.arguments), _ => Err(ServerError::ToolNotFound(call.name)), }?; @@ -1307,6 +1394,34 @@ impl BlueServer { let state = self.ensure_state()?; crate::handlers::reminder::handle_clear(state, args) } + + // Phase 5: Staging handlers + + fn handle_staging_lock(&mut self, args: &Option) -> Result { + let args = args.as_ref().ok_or(ServerError::InvalidParams)?; + let state = self.ensure_state()?; + crate::handlers::staging::handle_lock(state, args) + } + + fn handle_staging_unlock(&mut self, args: &Option) -> Result { + let args = args.as_ref().ok_or(ServerError::InvalidParams)?; + let state = self.ensure_state()?; + crate::handlers::staging::handle_unlock(state, args) + } + + fn handle_staging_status(&mut self, args: &Option) -> Result { + let empty = json!({}); + let args = args.as_ref().unwrap_or(&empty); + let state = self.ensure_state()?; + crate::handlers::staging::handle_status(state, args) + } + + fn handle_staging_cleanup(&mut self, args: &Option) -> Result { + let empty = json!({}); + let args = args.as_ref().unwrap_or(&empty); + let state = self.ensure_state()?; + crate::handlers::staging::handle_cleanup(state, args) + } } impl Default for BlueServer { diff --git a/docs/rfcs/0002-port-coherence-functionality.md b/docs/rfcs/0002-port-coherence-functionality.md index 8cc8114..aa9aa0f 100644 --- a/docs/rfcs/0002-port-coherence-functionality.md +++ b/docs/rfcs/0002-port-coherence-functionality.md @@ -231,11 +231,21 @@ blue/ - [x] Blue's voice in all error messages - [x] 21 tests passing -### Phase 5: Pending +### Phase 5: Staging Locks - COMPLETE -Remaining tools to port: -- Staging environment tools -- Code search/indexing +- [x] store.rs - Added staging_locks and staging_lock_queue tables +- [x] handlers/staging.rs - Lock/unlock/status/cleanup for multi-agent coordination +- [x] 4 new MCP tools: blue_staging_lock, blue_staging_unlock, + blue_staging_status, blue_staging_cleanup +- [x] Total: 32 MCP tools +- [x] Blue's voice in all error messages +- [x] 24 tests passing + +### Phase 6: Pending (Future) + +Remaining tools to port (if needed): +- Code search/indexing (requires tree-sitter) +- IaC detection and staging deployment tracking ## Test Plan