[codex] add flmctl deploy and object helpers#459
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new flmctl deploy command to simplify application deployment by automating packaging, uploading to the object cache, and registration. It also adds support for binary and Python installers, updates the executor manager to support content-addressed releases, and includes a new example application. My review identified a critical logic error in fetch_object_bytes regarding chunked object retrieval, a potential stability issue with DefaultHasher for release IDs, and an overly restrictive path validation in the tarball unpacking logic.
| async fn fetch_object_bytes(reference: &ObjectRef) -> Result<Vec<u8>, FlameError> { | ||
| ObjectKey::from_key(&reference.key)?; | ||
| let endpoint = CacheEndpoint::parse(&reference.endpoint)?; | ||
| let tls = current_cache_tls()?; | ||
| let mut client = FlightClient::new(connect_cache(&endpoint, tls.as_ref()).await?); | ||
| let mut stream = client | ||
| .do_get(Ticket::new(format!("{}:0", reference.key))) | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("cache get failed: {}", e)))?; | ||
|
|
||
| let mut base = None; | ||
| while let Some(batch) = stream | ||
| .try_next() | ||
| .await | ||
| .map_err(|e| FlameError::Internal(format!("cache get failed: {}", e)))? | ||
| { | ||
| if batch.num_rows() == 0 { | ||
| continue; | ||
| } | ||
| let data = data_column(&batch)?; | ||
| if let Some(kind) = kind_column(&batch)? { | ||
| for row in 0..batch.num_rows() { | ||
| match kind.value(row) { | ||
| OBJECT_KIND_BASE => { | ||
| if base.is_some() { | ||
| return Err(FlameError::InvalidConfig( | ||
| "object response contained multiple base rows".to_string(), | ||
| )); | ||
| } | ||
| base = Some(data.value(row).to_vec()); | ||
| } | ||
| OBJECT_KIND_PATCH => {} | ||
| other => { | ||
| return Err(FlameError::InvalidConfig(format!( | ||
| "invalid object response kind '{}'", | ||
| other | ||
| ))) | ||
| } | ||
| } | ||
| } | ||
| } else if base.is_none() { | ||
| base = Some(data.value(0).to_vec()); | ||
| } | ||
| } | ||
|
|
||
| base.ok_or_else(|| FlameError::NotFound(reference.key.clone())) | ||
| } |
There was a problem hiding this comment.
The fetch_object_bytes function contains a logic error that prevents it from correctly handling objects split across multiple rows or record batches. It currently returns an error if it encounters more than one OBJECT_KIND_BASE row, and it only retrieves the first row if the kind column is missing. Since do_put_file uploads objects in 1MB chunks as separate rows, this function will fail for any object larger than the chunk size. The implementation should accumulate bytes from all base rows across all batches in the stream.
async fn fetch_object_bytes(reference: &ObjectRef) -> Result<Vec<u8>, FlameError> {
ObjectKey::from_key(&reference.key)?;
let endpoint = CacheEndpoint::parse(&reference.endpoint)?;
let tls = current_cache_tls()?;
let mut client = FlightClient::new(connect_cache(&endpoint, tls.as_ref()).await?);
let mut stream = client
.do_get(Ticket::new(format!("{}:0", reference.key)))
.await
.map_err(|e| FlameError::Internal(format!("cache get failed: {}", e)))?;
let mut base_data = Vec::new();
let mut found = false;
while let Some(batch) = stream
.try_next()
.await
.map_err(|e| FlameError::Internal(format!("cache get failed: {}", e)))?
{
if batch.num_rows() == 0 {
continue;
}
let data = data_column(&batch)?;
let kind = kind_column(&batch)?;
for row in 0..batch.num_rows() {
let is_base = match kind {
Some(k) => k.value(row) == OBJECT_KIND_BASE,
None => true,
};
if is_base {
base_data.extend_from_slice(data.value(row));
found = true;
}
}
}
if !found {
return Err(FlameError::NotFound(reference.key.clone()));
}
Ok(base_data)
}| fn release_id(&self) -> String { | ||
| let mut hasher = std::collections::hash_map::DefaultHasher::new(); | ||
| self.hash(&mut hasher); | ||
| format!("{:016x}", hasher.finish()) | ||
| } |
There was a problem hiding this comment.
Using std::collections::hash_map::DefaultHasher for generating a release_id that is used as a persistent directory name is not recommended. The algorithm used by DefaultHasher is not guaranteed to be stable across different Rust compiler versions or even different runs of the same binary. If the cluster is upgraded and the hasher implementation changes, the executor manager will fail to locate existing installations on disk, leading to redundant downloads and orphaned directories. Consider using a stable hashing algorithm like SHA-256 or a fixed SipHash implementation to ensure consistent release IDs.
| fn validate_relative_path(path: &Path) -> Result<(), FlameError> { | ||
| for component in path.components() { | ||
| match component { | ||
| Component::Normal(_) => {} | ||
| _ => { | ||
| return Err(FlameError::InvalidConfig(format!( | ||
| "unsafe archive path: {}", | ||
| path.display() | ||
| ))) | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
There was a problem hiding this comment.
The validate_relative_path function is too restrictive because it rejects paths containing Component::CurDir (e.g., ./). Many tarballs created with common tools (like tar -C dir .) include a leading ./ for every entry. This implementation will cause flmctl deploy to fail on valid application packages that use this standard layout. The function should be updated to allow Component::CurDir while still rejecting dangerous components like ParentDir and RootDir.
| fn validate_relative_path(path: &Path) -> Result<(), FlameError> { | |
| for component in path.components() { | |
| match component { | |
| Component::Normal(_) => {} | |
| _ => { | |
| return Err(FlameError::InvalidConfig(format!( | |
| "unsafe archive path: {}", | |
| path.display() | |
| ))) | |
| } | |
| } | |
| } | |
| Ok(()) | |
| } | |
| fn validate_relative_path(path: &Path) -> Result<(), FlameError> { | |
| for component in path.components() { | |
| match component { | |
| Component::Normal(_) | Component::CurDir => {} | |
| _ => { | |
| return Err(FlameError::InvalidConfig(format!( | |
| "unsafe archive path: {}", | |
| path.display() | |
| ))) | |
| } | |
| } | |
| } | |
| Ok(()) | |
| } |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Add RFE458 design and flmctl deploy packaging for directory, tarball, and executable application inputs. Add Rust SDK object helpers, object-cache integration coverage, binary application installer support, and example installation through flmadm. Update Pi and Candle Rust examples for the new typed API and direct flmctl deploy flow.
Summary
flmctl deployfor packaging directory, tarball, or executable-file applicationsexamples/candle/basedNotes
Executable-file deploys create
<app>.tar.gzlocally withbin/<binary>inside, then upload it under a three-component object key like<app>/pkg/<app>-<sha16>.tar.gzto match FlamePy/object-cache rules.Host-built example binaries were intentionally not committed.
Validation
cargo fmt --all -- --checkcargo test -p flmctl deploycargo test -p flame-rs object --libcargo test -p flame-executor-manager binarycargo test -p flame-object-cache flame_rs_with local-port permissioncargo check -p flmctlcargo check -p flame-rscargo check -p flame-executor-managergit diff --check