Skip to content

Commit

Permalink
tart {clone,pull}: make deduplication opt-in (#924)
Browse files Browse the repository at this point in the history
  • Loading branch information
edigaryev authored Oct 25, 2024
1 parent 3bf0bb2 commit b52a857
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 21 deletions.
5 changes: 4 additions & 1 deletion Sources/tart/Commands/Clone.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ struct Clone: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4

@Flag(help: .hidden)
var deduplicate: Bool = false

func validate() throws {
if newName.contains("/") {
throw ValidationError("<new-name> should be a local name")
Expand All @@ -45,7 +48,7 @@ struct Clone: AsyncParsableCommand {
if let remoteName = try? RemoteName(sourceName), !ociStorage.exists(remoteName) {
// Pull the VM in case it's OCI-based and doesn't exist locally yet
let registry = try Registry(host: remoteName.host, namespace: remoteName.namespace, insecure: insecure)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}

let sourceVM = try VMStorageHelper.open(sourceName)
Expand Down
5 changes: 4 additions & 1 deletion Sources/tart/Commands/Pull.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct Pull: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4

@Flag(help: .hidden)
var deduplicate: Bool = false

func validate() throws {
if concurrency < 1 {
throw ValidationError("network concurrency cannot be less than 1")
Expand All @@ -43,6 +46,6 @@ struct Pull: AsyncParsableCommand {

defaultLogger.appendNewLine("pulling \(remoteName)...")

try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency)
try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/Disk.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Foundation

protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/DiskV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DiskV1: Disk {
return pushedLayers
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}
Expand Down
31 changes: 20 additions & 11 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ class DiskV2: Disk {
}
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)

if !pullResumed {
if let localLayerCache = localLayerCache {
if deduplicate, let localLayerCache = localLayerCache {
// Clone the local layer cache's disk and use it as a base, potentially
// reducing the space usage since some blocks won't be written at all
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
Expand Down Expand Up @@ -151,26 +151,31 @@ class DiskV2: Disk {

// Also open the disk file for reading and verifying
// its contents in case the local layer cache is used
let rdisk: FileHandle? = if localLayerCache != nil {
let rdisk: FileHandle? = if deduplicate && localLayerCache != nil {
try FileHandle(forReadingFrom: diskURL)
} else {
nil
}

// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset) {
// indicates that the locally cloned disk image has the same content at the given offset
let localHit = localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest
&& localLayerInfo.range.lowerBound == diskWritingOffset
// doesn't seem that localHit can ever be false if the localLayerCache is not nil
// but let's just add extra safety here and check it
if !localHit {
// Check if we already have this layer contents in the local layer cache,
// or perhaps even on the cloned disk (when the deduplication is enabled)
if let localLayerCache = localLayerCache,
let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset),
localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest {
if deduplicate && localLayerInfo.range.lowerBound == diskWritingOffset {
// Do nothing, because the data is already on the disk that we've inherited from
} else {
// Fulfil the layer contents from the local blob cache
let data = localLayerCache.subdata(localLayerInfo.range)
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}

try disk.close()

if let rdisk = rdisk {
try rdisk.close()
}

// Update the progress
progress.completedUnitCount += Int64(diskLayer.size)

Expand Down Expand Up @@ -198,6 +203,10 @@ class DiskV2: Disk {
try filter.finalize()

try disk.close()

if let rdisk = rdisk {
try rdisk.close()
}
}

globalDiskWritingOffset += uncompressedLayerSize
Expand Down
7 changes: 4 additions & 3 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ enum OCIError: Error {
}

extension VMDirectory {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
$0.mediaType == configMediaType
Expand Down Expand Up @@ -54,12 +54,13 @@ extension VMDirectory {
do {
try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL,
concurrency: concurrency, progress: progress,
localLayerCache: localLayerCache)
localLayerCache: localLayerCache,
deduplicate: deduplicate)
} catch let error where error is FilterError {
throw RuntimeError.PullFailed("failed to decompress disk: \(error.localizedDescription)")
}

if let llc = localLayerCache {
if deduplicate, let llc = localLayerCache {
// set custom attribute to remember deduplicated bytes
diskURL.setDeduplicatedBytes(llc.deduplicatedBytes)
}
Expand Down
10 changes: 7 additions & 3 deletions Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class VMStorageOCI: PrunableStorage {
try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir }
}

func pull(_ name: RemoteName, registry: Registry, concurrency: UInt) async throws {
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt, deduplicate: Bool) async throws {
SentrySDK.configureScope { scope in
scope.setContext(value: ["imageName": name.description], key: "OCI")
}
Expand Down Expand Up @@ -203,10 +203,14 @@ class VMStorageOCI: PrunableStorage {
if let llc = localLayerCache {
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)

defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
if deduplicate {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
} else {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to avoid fetching \(deduplicatedHuman), will try use it...")
}
}

try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is Retryable {
print("Error: \(error.localizedDescription)")
Expand Down

0 comments on commit b52a857

Please sign in to comment.