feat: Phase 5 - staging lock tools for multi-agent coordination

Add staging resource locking for coordinating parallel agent work:

- store.rs: Added staging_locks and staging_lock_queue tables
- handlers/staging.rs: Lock acquire/release with queuing, status, cleanup
- Automatic lock expiration and queue cleanup

New MCP tools (4):
- blue_staging_lock - Acquire exclusive access to staging resources
- blue_staging_unlock - Release a staging lock
- blue_staging_status - Check lock status for specific resource or all
- blue_staging_cleanup - Clean up expired locks and orphaned queue entries

Total: 32 MCP tools, 24 tests passing

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Eric Garcia 2026-01-24 03:37:40 -05:00
parent 1c2ceb71d1
commit 8977b30e63
6 changed files with 657 additions and 5 deletions

View file

@ -22,6 +22,6 @@ pub mod workflow;
pub use documents::*;
pub use repo::{detect_blue, BlueHome, RepoError, WorktreeInfo};
pub use state::{ItemType, ProjectState, StateError, StatusSummary, WorkItem};
pub use store::{DocType, Document, DocumentStore, LinkType, Reminder, ReminderStatus, SearchResult, Session, SessionType, StoreError, Task as StoreTask, TaskProgress, Worktree};
pub use store::{DocType, Document, DocumentStore, LinkType, Reminder, ReminderStatus, SearchResult, Session, SessionType, StagingLock, StagingLockQueueEntry, StagingLockResult, StoreError, Task as StoreTask, TaskProgress, Worktree};
pub use voice::*;
pub use workflow::{PrdStatus, RfcStatus, SpikeOutcome as WorkflowSpikeOutcome, SpikeStatus, WorkflowError};

View file

@ -103,6 +103,27 @@ const SCHEMA: &str = r#"
CREATE INDEX IF NOT EXISTS idx_reminders_status ON reminders(status);
CREATE INDEX IF NOT EXISTS idx_reminders_due ON reminders(due_date) WHERE status = 'pending';
CREATE TABLE IF NOT EXISTS staging_locks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resource TEXT NOT NULL UNIQUE,
locked_by TEXT NOT NULL,
agent_id TEXT,
locked_at TEXT NOT NULL,
expires_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS staging_lock_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resource TEXT NOT NULL,
requester TEXT NOT NULL,
agent_id TEXT,
requested_at TEXT NOT NULL,
FOREIGN KEY (resource) REFERENCES staging_locks(resource) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_staging_locks_resource ON staging_locks(resource);
CREATE INDEX IF NOT EXISTS idx_staging_queue_resource ON staging_lock_queue(resource);
"#;
/// FTS5 schema for full-text search
@ -366,6 +387,40 @@ impl Reminder {
}
}
/// A staging resource lock
#[derive(Debug, Clone)]
pub struct StagingLock {
pub id: Option<i64>,
pub resource: String,
pub locked_by: String,
pub agent_id: Option<String>,
pub locked_at: String,
pub expires_at: String,
}
/// A queued request for a staging lock
#[derive(Debug, Clone)]
pub struct StagingLockQueueEntry {
pub id: Option<i64>,
pub resource: String,
pub requester: String,
pub agent_id: Option<String>,
pub requested_at: String,
}
/// Result of attempting to acquire a staging lock
#[derive(Debug)]
pub enum StagingLockResult {
/// Lock was acquired
Acquired { expires_at: String },
/// Lock is held by someone else, added to queue
Queued {
position: usize,
current_holder: String,
expires_at: String,
},
}
/// Store errors - in Blue's voice
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
@ -1368,6 +1423,215 @@ impl DocumentStore {
Ok(())
})
}
// ==================== Staging Lock Operations ====================
/// Acquire a staging lock or join queue
pub fn acquire_staging_lock(
&self,
resource: &str,
locked_by: &str,
agent_id: Option<&str>,
duration_minutes: i64,
) -> Result<StagingLockResult, StoreError> {
self.with_retry(|| {
let now = chrono::Utc::now();
let now_str = now.to_rfc3339();
let expires_at = now + chrono::Duration::minutes(duration_minutes);
let expires_str = expires_at.to_rfc3339();
// First, clean up expired locks
self.conn.execute(
"DELETE FROM staging_locks WHERE expires_at < ?1",
params![now_str],
)?;
// Check if lock exists
let existing: Option<(String, String)> = self.conn
.query_row(
"SELECT locked_by, expires_at FROM staging_locks WHERE resource = ?1",
params![resource],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
match existing {
Some((holder, holder_expires)) => {
// Lock exists, add to queue
self.conn.execute(
"INSERT INTO staging_lock_queue (resource, requester, agent_id, requested_at)
VALUES (?1, ?2, ?3, ?4)",
params![resource, locked_by, agent_id, now_str],
)?;
// Get queue position
let position: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM staging_lock_queue WHERE resource = ?1",
params![resource],
|row| row.get(0),
)?;
Ok(StagingLockResult::Queued {
position: position as usize,
current_holder: holder,
expires_at: holder_expires,
})
}
None => {
// No lock, acquire it
self.conn.execute(
"INSERT INTO staging_locks (resource, locked_by, agent_id, locked_at, expires_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![resource, locked_by, agent_id, now_str, expires_str],
)?;
Ok(StagingLockResult::Acquired {
expires_at: expires_str,
})
}
}
})
}
/// Release a staging lock
pub fn release_staging_lock(&self, resource: &str, locked_by: &str) -> Result<Option<String>, StoreError> {
self.with_retry(|| {
// Verify the lock is held by the requester
let holder: Option<String> = self.conn
.query_row(
"SELECT locked_by FROM staging_locks WHERE resource = ?1",
params![resource],
|row| row.get(0),
)
.optional()?;
match holder {
Some(h) if h == locked_by => {
// Get next in queue BEFORE deleting lock (CASCADE would remove queue entries)
let next: Option<String> = self.conn
.query_row(
"SELECT requester FROM staging_lock_queue WHERE resource = ?1 ORDER BY requested_at ASC LIMIT 1",
params![resource],
|row| row.get(0),
)
.optional()?;
// Release the lock (CASCADE will clean up queue)
self.conn.execute(
"DELETE FROM staging_locks WHERE resource = ?1",
params![resource],
)?;
Ok(next)
}
Some(_) => Err(StoreError::InvalidOperation(format!(
"Lock for '{}' is not held by '{}'",
resource, locked_by
))),
None => Err(StoreError::NotFound(format!("lock for '{}'", resource))),
}
})
}
/// Get current staging lock for a resource
pub fn get_staging_lock(&self, resource: &str) -> Result<Option<StagingLock>, StoreError> {
// First clean up expired locks
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"DELETE FROM staging_locks WHERE expires_at < ?1",
params![now],
)?;
self.conn
.query_row(
"SELECT id, resource, locked_by, agent_id, locked_at, expires_at
FROM staging_locks WHERE resource = ?1",
params![resource],
|row| {
Ok(StagingLock {
id: Some(row.get(0)?),
resource: row.get(1)?,
locked_by: row.get(2)?,
agent_id: row.get(3)?,
locked_at: row.get(4)?,
expires_at: row.get(5)?,
})
},
)
.optional()
.map_err(StoreError::Database)
}
/// Get queue for a staging lock
pub fn get_staging_lock_queue(&self, resource: &str) -> Result<Vec<StagingLockQueueEntry>, StoreError> {
let mut stmt = self.conn.prepare(
"SELECT id, resource, requester, agent_id, requested_at
FROM staging_lock_queue WHERE resource = ?1 ORDER BY requested_at ASC",
)?;
let rows = stmt.query_map(params![resource], |row| {
Ok(StagingLockQueueEntry {
id: Some(row.get(0)?),
resource: row.get(1)?,
requester: row.get(2)?,
agent_id: row.get(3)?,
requested_at: row.get(4)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(StoreError::Database)
}
/// List all active staging locks
pub fn list_staging_locks(&self) -> Result<Vec<StagingLock>, StoreError> {
// First clean up expired locks
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"DELETE FROM staging_locks WHERE expires_at < ?1",
params![now],
)?;
let mut stmt = self.conn.prepare(
"SELECT id, resource, locked_by, agent_id, locked_at, expires_at
FROM staging_locks ORDER BY locked_at DESC",
)?;
let rows = stmt.query_map([], |row| {
Ok(StagingLock {
id: Some(row.get(0)?),
resource: row.get(1)?,
locked_by: row.get(2)?,
agent_id: row.get(3)?,
locked_at: row.get(4)?,
expires_at: row.get(5)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(StoreError::Database)
}
/// Clean up expired staging locks and orphaned queue entries
pub fn cleanup_expired_staging(&self) -> Result<(usize, usize), StoreError> {
self.with_retry(|| {
let now = chrono::Utc::now().to_rfc3339();
// Clean expired locks
let locks_cleaned = self.conn.execute(
"DELETE FROM staging_locks WHERE expires_at < ?1",
params![now],
)?;
// Clean orphaned queue entries (for resources with no lock)
let queue_cleaned = self.conn.execute(
"DELETE FROM staging_lock_queue WHERE resource NOT IN (SELECT resource FROM staging_locks)",
[],
)?;
Ok((locks_cleaned, queue_cleaned))
})
}
}
#[cfg(test)]

View file

@ -9,4 +9,5 @@ pub mod release;
pub mod reminder;
pub mod session;
pub mod spike;
pub mod staging;
pub mod worktree;

View file

@ -0,0 +1,262 @@
//! Staging lock tool handlers
//!
//! Handles staging environment isolation through resource locking.
//! Ensures single-writer access to staging resources like migrations.
use blue_core::{ProjectState, StagingLockResult};
use serde_json::{json, Value};
use crate::error::ServerError;
/// Handle blue_staging_lock
pub fn handle_lock(state: &ProjectState, args: &Value) -> Result<Value, ServerError> {
let resource = args
.get("resource")
.and_then(|v| v.as_str())
.ok_or(ServerError::InvalidParams)?;
let locked_by = args
.get("locked_by")
.and_then(|v| v.as_str())
.ok_or(ServerError::InvalidParams)?;
let agent_id = args.get("agent_id").and_then(|v| v.as_str());
let duration_minutes = args
.get("duration_minutes")
.and_then(|v| v.as_i64())
.unwrap_or(30);
let result = state
.store
.acquire_staging_lock(resource, locked_by, agent_id, duration_minutes)
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
match result {
StagingLockResult::Acquired { expires_at } => Ok(json!({
"status": "acquired",
"message": blue_core::voice::success(
&format!("Acquired staging lock for '{}'", resource),
Some(&format!("Expires at {}. Release with blue_staging_unlock when done.", expires_at))
),
"resource": resource,
"locked_by": locked_by,
"expires_at": expires_at
})),
StagingLockResult::Queued {
position,
current_holder,
expires_at,
} => Ok(json!({
"status": "queued",
"message": blue_core::voice::info(
&format!("Resource '{}' is locked by '{}'", resource, current_holder),
Some(&format!("You're #{} in queue. Lock expires at {}.", position, expires_at))
),
"resource": resource,
"queue_position": position,
"current_holder": current_holder,
"holder_expires_at": expires_at
})),
}
}
/// Handle blue_staging_unlock
pub fn handle_unlock(state: &ProjectState, args: &Value) -> Result<Value, ServerError> {
let resource = args
.get("resource")
.and_then(|v| v.as_str())
.ok_or(ServerError::InvalidParams)?;
let locked_by = args
.get("locked_by")
.and_then(|v| v.as_str())
.ok_or(ServerError::InvalidParams)?;
let next_in_queue = state
.store
.release_staging_lock(resource, locked_by)
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
let hint = match &next_in_queue {
Some(next) => format!("Next in queue: '{}' can now acquire the lock.", next),
None => "No one waiting in queue.".to_string(),
};
Ok(json!({
"status": "released",
"message": blue_core::voice::success(
&format!("Released staging lock for '{}'", resource),
Some(&hint)
),
"resource": resource,
"next_in_queue": next_in_queue
}))
}
/// Handle blue_staging_status
pub fn handle_status(state: &ProjectState, args: &Value) -> Result<Value, ServerError> {
let resource = args.get("resource").and_then(|v| v.as_str());
if let Some(resource) = resource {
// Get status for specific resource
let lock = state
.store
.get_staging_lock(resource)
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
let queue = state
.store
.get_staging_lock_queue(resource)
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
if let Some(lock) = lock {
Ok(json!({
"status": "locked",
"message": blue_core::voice::info(
&format!("Resource '{}' is locked by '{}'", resource, lock.locked_by),
Some(&format!("Expires at {}. {} waiting in queue.", lock.expires_at, queue.len()))
),
"resource": resource,
"lock": {
"locked_by": lock.locked_by,
"agent_id": lock.agent_id,
"locked_at": lock.locked_at,
"expires_at": lock.expires_at
},
"queue": queue.iter().map(|q| json!({
"requester": q.requester,
"agent_id": q.agent_id,
"requested_at": q.requested_at
})).collect::<Vec<_>>()
}))
} else {
Ok(json!({
"status": "available",
"message": blue_core::voice::info(
&format!("Resource '{}' is available", resource),
None::<&str>
),
"resource": resource,
"lock": null,
"queue": []
}))
}
} else {
// List all locks
let locks = state
.store
.list_staging_locks()
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
let hint = if locks.is_empty() {
"No active staging locks. Resources are available."
} else {
"Use blue_staging_unlock to release locks when done."
};
Ok(json!({
"status": "success",
"message": blue_core::voice::info(
&format!("{} active staging lock{}", locks.len(), if locks.len() == 1 { "" } else { "s" }),
Some(hint)
),
"locks": locks.iter().map(|l| json!({
"resource": l.resource,
"locked_by": l.locked_by,
"agent_id": l.agent_id,
"locked_at": l.locked_at,
"expires_at": l.expires_at
})).collect::<Vec<_>>()
}))
}
}
/// Handle blue_staging_cleanup
pub fn handle_cleanup(state: &ProjectState, _args: &Value) -> Result<Value, ServerError> {
let (locks_cleaned, queue_cleaned) = state
.store
.cleanup_expired_staging()
.map_err(|e| ServerError::CommandFailed(e.to_string()))?;
let total = locks_cleaned + queue_cleaned;
let hint = if total == 0 {
"No expired staging resources found. All clean."
} else {
"Cleaned up expired resources."
};
Ok(json!({
"status": "success",
"message": blue_core::voice::success(
&format!("Cleaned {} expired lock{}, {} orphaned queue entr{}",
locks_cleaned,
if locks_cleaned == 1 { "" } else { "s" },
queue_cleaned,
if queue_cleaned == 1 { "y" } else { "ies" }
),
Some(hint)
),
"locks_cleaned": locks_cleaned,
"queue_entries_cleaned": queue_cleaned,
"total_cleaned": total
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lock_requires_resource() {
let state = ProjectState::for_test();
let args = json!({
"locked_by": "test-agent"
});
let result = handle_lock(&state, &args);
assert!(result.is_err());
}
#[test]
fn test_lock_acquire_and_release() {
let state = ProjectState::for_test();
// Acquire lock
let args = json!({
"resource": "migration",
"locked_by": "agent-1",
"duration_minutes": 5
});
let result = handle_lock(&state, &args).unwrap();
assert_eq!(result["status"], "acquired");
// Try to acquire again - should queue
let args2 = json!({
"resource": "migration",
"locked_by": "agent-2"
});
let result2 = handle_lock(&state, &args2).unwrap();
assert_eq!(result2["status"], "queued");
assert_eq!(result2["queue_position"], 1);
// Release
let release_args = json!({
"resource": "migration",
"locked_by": "agent-1"
});
let release_result = handle_unlock(&state, &release_args).unwrap();
assert_eq!(release_result["status"], "released");
assert_eq!(release_result["next_in_queue"], "agent-2");
}
#[test]
fn test_status_no_locks() {
let state = ProjectState::for_test();
let args = json!({});
let result = handle_status(&state, &args).unwrap();
assert_eq!(result["status"], "success");
assert_eq!(result["locks"].as_array().unwrap().len(), 0);
}
}

View file

@ -749,6 +749,88 @@ impl BlueServer {
}
}
}
},
{
"name": "blue_staging_lock",
"description": "Acquire exclusive access to a staging resource.",
"inputSchema": {
"type": "object",
"properties": {
"cwd": {
"type": "string",
"description": "Current working directory"
},
"resource": {
"type": "string",
"description": "Resource to lock (e.g., 'migration', 'staging-db')"
},
"locked_by": {
"type": "string",
"description": "Identifier for lock holder (RFC title or PR number)"
},
"agent_id": {
"type": "string",
"description": "Blue agent ID (from .env.isolated)"
},
"duration_minutes": {
"type": "number",
"description": "Lock duration in minutes (default 30)"
}
},
"required": ["resource", "locked_by"]
}
},
{
"name": "blue_staging_unlock",
"description": "Release a staging lock.",
"inputSchema": {
"type": "object",
"properties": {
"cwd": {
"type": "string",
"description": "Current working directory"
},
"resource": {
"type": "string",
"description": "Resource to unlock"
},
"locked_by": {
"type": "string",
"description": "Identifier that acquired the lock"
}
},
"required": ["resource", "locked_by"]
}
},
{
"name": "blue_staging_status",
"description": "Check staging lock status.",
"inputSchema": {
"type": "object",
"properties": {
"cwd": {
"type": "string",
"description": "Current working directory"
},
"resource": {
"type": "string",
"description": "Specific resource to check (omit for all locks)"
}
}
}
},
{
"name": "blue_staging_cleanup",
"description": "Clean up expired staging resources.",
"inputSchema": {
"type": "object",
"properties": {
"cwd": {
"type": "string",
"description": "Current working directory"
}
}
}
}
]
}))
@ -800,6 +882,11 @@ impl BlueServer {
"blue_reminder_list" => self.handle_reminder_list(&call.arguments),
"blue_reminder_snooze" => self.handle_reminder_snooze(&call.arguments),
"blue_reminder_clear" => self.handle_reminder_clear(&call.arguments),
// Phase 5: Staging handlers
"blue_staging_lock" => self.handle_staging_lock(&call.arguments),
"blue_staging_unlock" => self.handle_staging_unlock(&call.arguments),
"blue_staging_status" => self.handle_staging_status(&call.arguments),
"blue_staging_cleanup" => self.handle_staging_cleanup(&call.arguments),
_ => Err(ServerError::ToolNotFound(call.name)),
}?;
@ -1307,6 +1394,34 @@ impl BlueServer {
let state = self.ensure_state()?;
crate::handlers::reminder::handle_clear(state, args)
}
// Phase 5: Staging handlers
fn handle_staging_lock(&mut self, args: &Option<Value>) -> Result<Value, ServerError> {
let args = args.as_ref().ok_or(ServerError::InvalidParams)?;
let state = self.ensure_state()?;
crate::handlers::staging::handle_lock(state, args)
}
fn handle_staging_unlock(&mut self, args: &Option<Value>) -> Result<Value, ServerError> {
let args = args.as_ref().ok_or(ServerError::InvalidParams)?;
let state = self.ensure_state()?;
crate::handlers::staging::handle_unlock(state, args)
}
fn handle_staging_status(&mut self, args: &Option<Value>) -> Result<Value, ServerError> {
let empty = json!({});
let args = args.as_ref().unwrap_or(&empty);
let state = self.ensure_state()?;
crate::handlers::staging::handle_status(state, args)
}
fn handle_staging_cleanup(&mut self, args: &Option<Value>) -> Result<Value, ServerError> {
let empty = json!({});
let args = args.as_ref().unwrap_or(&empty);
let state = self.ensure_state()?;
crate::handlers::staging::handle_cleanup(state, args)
}
}
impl Default for BlueServer {

View file

@ -231,11 +231,21 @@ blue/
- [x] Blue's voice in all error messages
- [x] 21 tests passing
### Phase 5: Pending
### Phase 5: Staging Locks - COMPLETE
Remaining tools to port:
- Staging environment tools
- Code search/indexing
- [x] store.rs - Added staging_locks and staging_lock_queue tables
- [x] handlers/staging.rs - Lock/unlock/status/cleanup for multi-agent coordination
- [x] 4 new MCP tools: blue_staging_lock, blue_staging_unlock,
blue_staging_status, blue_staging_cleanup
- [x] Total: 32 MCP tools
- [x] Blue's voice in all error messages
- [x] 24 tests passing
### Phase 6: Pending (Future)
Remaining tools to port (if needed):
- Code search/indexing (requires tree-sitter)
- IaC detection and staging deployment tracking
## Test Plan