Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bundle/config/mutator/configure_wsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ func (m *configureWSFS) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagno
return nil
}

// On serverless (client version 2.5+), use the native sync root directly via FUSE.
// The FUSE provides capabilities for both reading and writing notebooks. It also
// is much faster and enables running cloud tests on DBR, since otherwise the tests
// fail with an AsyncFlushError because of the conflict between writing to FUSE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q - does this also impact Dabs-in-ws?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary to make the bundle generate --bind command work. Right now DABs in ws mixes the API and FUSE. This PR makes that consistent.

// and via the workspace APIs simultaneously.
//
// Writing notebooks via FUSE is only supported for serverless client version 2+.
// Since we could only test v2.5, since the platform only allows selecting v2 (which is v2.5 internally),
// we restrict FUSE to only be used for v2.5+.
v := dbr.GetVersion(ctx)
if v.Type == dbr.ClusterTypeServerless && (v.Major > 2 || (v.Major == 2 && v.Minor >= 5)) {
return nil
}

// If so, swap out vfs.Path instance of the sync root with one that
// makes all Workspace File System interactions extension aware.
p, err := vfs.NewFilerPath(ctx, root, func(path string) (filer.Filer, error) {
Expand Down
59 changes: 51 additions & 8 deletions bundle/config/mutator/configure_wsfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mutator_test

import (
"context"
"reflect"
"runtime"
"testing"

Expand Down Expand Up @@ -53,13 +54,55 @@ func TestConfigureWSFS_SkipsIfNotRunningOnRuntime(t *testing.T) {
assert.Equal(t, originalSyncRoot, b.SyncRoot)
}

func TestConfigureWSFS_SwapSyncRoot(t *testing.T) {
b := mockBundleForConfigureWSFS(t, "/Workspace/foo")
originalSyncRoot := b.SyncRoot
func TestConfigureWSFS_DBRVersions(t *testing.T) {
tests := []struct {
name string
version string
expectFUSE bool // true = osPath (uses FUSE), false = filerPath (uses wsfs extension)
}{
// Serverless client version 2.5+ should use FUSE directly (osPath)
{"serverless_client_2_5", "client.2.5", true},
{"serverless_client_2_6", "client.2.6", true},
{"serverless_client_3", "client.3", true},
{"serverless_client_3_0", "client.3.0", true},
{"serverless_client_3_6", "client.3.6", true},
{"serverless_client_4_9", "client.4.9", true},
{"serverless_client_4_10", "client.4.10", true},

ctx := context.Background()
ctx = dbr.MockRuntime(ctx, dbr.Environment{IsDbr: true, Version: "15.4"})
diags := bundle.Apply(ctx, b, mutator.ConfigureWSFS())
assert.Empty(t, diags)
assert.NotEqual(t, originalSyncRoot, b.SyncRoot)
// Serverless client version < 2.5 should use wsfs extension client (filerPath)
{"serverless_client_1", "client.1", false},
{"serverless_client_1_13", "client.1.13", false},
{"serverless_client_2", "client.2", false},
{"serverless_client_2_0", "client.2.0", false},
{"serverless_client_2_1", "client.2.1", false},
{"serverless_client_2_4", "client.2.4", false},

// Interactive (non-serverless) versions should use wsfs extension client (filerPath)
{"interactive_15_4", "15.4", false},
{"interactive_16_3", "16.3", false},
{"interactive_16_4", "16.4", false},
{"interactive_17_0", "17.0", false},
{"interactive_17_1", "17.1", false},
{"interactive_17_2", "17.2", false},
{"interactive_17_3", "17.3", false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := mockBundleForConfigureWSFS(t, "/Workspace/foo")

ctx := context.Background()
ctx = dbr.MockRuntime(ctx, dbr.Environment{IsDbr: true, Version: tt.version})
diags := bundle.Apply(ctx, b, mutator.ConfigureWSFS())
assert.Empty(t, diags)

// Check the underlying type of SyncRoot
typeName := reflect.TypeOf(b.SyncRoot).String()
if tt.expectFUSE {
assert.Equal(t, "*vfs.osPath", typeName, "expected osPath (FUSE) for version %s", tt.version)
} else {
assert.Equal(t, "*vfs.filerPath", typeName, "expected filerPath (wsfs extension) for version %s", tt.version)
}
})
}
}
77 changes: 76 additions & 1 deletion libs/dbr/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package dbr

import "context"
import (
"context"
"strconv"
"strings"
)

// key is a package-local type to use for context keys.
//
Expand All @@ -15,6 +19,71 @@ const (
dbrKey = key(1)
)

// ClusterType represents the type of Databricks cluster.
type ClusterType int

const (
ClusterTypeUnknown ClusterType = iota
ClusterTypeInteractive
ClusterTypeServerless
)

func (t ClusterType) String() string {
switch t {
case ClusterTypeInteractive:
return "interactive"
case ClusterTypeServerless:
return "serverless"
default:
return "unknown"
}
}

// Version represents a parsed DBR version.
type Version struct {
Type ClusterType
Major int
Minor int
Raw string
}

// ParseVersion parses a DBR version string and returns structured version info.
// Examples:
// - "16.3" -> Interactive, Major=16, Minor=3
// - "client.4.9" -> Serverless, Major=4, Minor=9
func ParseVersion(version string) Version {
result := Version{Raw: version}

if version == "" {
return result
}

// Serverless versions have "client." prefix
if strings.HasPrefix(version, "client.") {
result.Type = ClusterTypeServerless
// Parse "client.X.Y" format
parts := strings.Split(strings.TrimPrefix(version, "client."), ".")
if len(parts) >= 1 {
result.Major, _ = strconv.Atoi(parts[0])
}
if len(parts) >= 2 {
result.Minor, _ = strconv.Atoi(parts[1])
}
return result
}

// Interactive versions are "X.Y" format
result.Type = ClusterTypeInteractive
parts := strings.Split(version, ".")
if len(parts) >= 1 {
result.Major, _ = strconv.Atoi(parts[0])
}
if len(parts) >= 2 {
result.Minor, _ = strconv.Atoi(parts[1])
}
return result
}

type Environment struct {
IsDbr bool
Version string
Expand Down Expand Up @@ -61,3 +130,9 @@ func RuntimeVersion(ctx context.Context) string {

return v.(Environment).Version
}

// GetVersion returns the parsed runtime version from the context.
// It expects a context returned by [DetectRuntime] or [MockRuntime].
func GetVersion(ctx context.Context) Version {
return ParseVersion(RuntimeVersion(ctx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't RuntimeVersion() also return a Version type?

}
86 changes: 86 additions & 0 deletions libs/dbr/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,89 @@ func TestContext_RuntimeVersionWithMock(t *testing.T) {
assert.Equal(t, "15.4", RuntimeVersion(MockRuntime(ctx, Environment{IsDbr: true, Version: "15.4"})))
assert.Empty(t, RuntimeVersion(MockRuntime(ctx, Environment{})))
}

func TestParseVersion_Serverless(t *testing.T) {
tests := []struct {
version string
expectedType ClusterType
expectedMajor int
expectedMinor int
}{
{"client.4.9", ClusterTypeServerless, 4, 9},
{"client.4.10", ClusterTypeServerless, 4, 10},
{"client.3.6", ClusterTypeServerless, 3, 6},
{"client.2", ClusterTypeServerless, 2, 0},
{"client.2.1", ClusterTypeServerless, 2, 1},
{"client.1", ClusterTypeServerless, 1, 0},
{"client.1.13", ClusterTypeServerless, 1, 13},
}

for _, tt := range tests {
t.Run(tt.version, func(t *testing.T) {
v := ParseVersion(tt.version)
assert.Equal(t, tt.expectedType, v.Type)
assert.Equal(t, tt.expectedMajor, v.Major)
assert.Equal(t, tt.expectedMinor, v.Minor)
assert.Equal(t, tt.version, v.Raw)
})
}
}

func TestParseVersion_Interactive(t *testing.T) {
tests := []struct {
version string
expectedType ClusterType
expectedMajor int
expectedMinor int
}{
{"16.3", ClusterTypeInteractive, 16, 3},
{"16.4", ClusterTypeInteractive, 16, 4},
{"17.0", ClusterTypeInteractive, 17, 0},
{"17.1", ClusterTypeInteractive, 17, 1},
{"17.2", ClusterTypeInteractive, 17, 2},
{"17.3", ClusterTypeInteractive, 17, 3},
{"15.4", ClusterTypeInteractive, 15, 4},
}

for _, tt := range tests {
t.Run(tt.version, func(t *testing.T) {
v := ParseVersion(tt.version)
assert.Equal(t, tt.expectedType, v.Type)
assert.Equal(t, tt.expectedMajor, v.Major)
assert.Equal(t, tt.expectedMinor, v.Minor)
assert.Equal(t, tt.version, v.Raw)
})
}
}

func TestParseVersion_Empty(t *testing.T) {
v := ParseVersion("")
assert.Equal(t, ClusterTypeUnknown, v.Type)
assert.Equal(t, 0, v.Major)
assert.Equal(t, 0, v.Minor)
assert.Equal(t, "", v.Raw)
}

func TestClusterType_String(t *testing.T) {
assert.Equal(t, "interactive", ClusterTypeInteractive.String())
assert.Equal(t, "serverless", ClusterTypeServerless.String())
assert.Equal(t, "unknown", ClusterTypeUnknown.String())
}

func TestContext_GetVersion(t *testing.T) {
ctx := context.Background()

// Test serverless version
serverlessCtx := MockRuntime(ctx, Environment{IsDbr: true, Version: "client.4.9"})
v := GetVersion(serverlessCtx)
assert.Equal(t, ClusterTypeServerless, v.Type)
assert.Equal(t, 4, v.Major)
assert.Equal(t, 9, v.Minor)

// Test interactive version
interactiveCtx := MockRuntime(ctx, Environment{IsDbr: true, Version: "17.3"})
v = GetVersion(interactiveCtx)
assert.Equal(t, ClusterTypeInteractive, v.Type)
assert.Equal(t, 17, v.Major)
assert.Equal(t, 3, v.Minor)
}