Skip to content

Commit

Permalink
Working on multi-file uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
diamondap committed Jan 18, 2024
1 parent 0824c77 commit 9d7683c
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 27 deletions.
6 changes: 2 additions & 4 deletions core/bagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,8 @@ func (b *Bagger) pathForPayloadFile(fullPath string) string {
if filepath.Ext(b.OutputPath) == "" {
// Bag is a directory. We don't want to duplicate the
// bag name in the path because it's already there.
// We want something like this:
// /data/file.txt
// NOT this:
// /bag-name/data/file.txt
// We want something like this: /data/file.txt
// NOT this: /bag-name/data/file.txt
return fmt.Sprintf("data%s", shortPath)
}
// Else, bag is file and we do want the bag name to
Expand Down
65 changes: 56 additions & 9 deletions core/upload_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
)

type UploadOperation struct {
Errors map[string]string `json:"errors"`
PayloadSize int64 `json:"payloadSize"`
Result *OperationResult `json:"result"`
SourceFiles []string `json:"sourceFiles"`
StorageService *StorageService `json:"storageService"`
MessageChannel chan *EventMessage `json:"-"`
Errors map[string]string `json:"errors"`
PayloadSize int64 `json:"payloadSize"`
Result *OperationResult `json:"result"`
SourceFiles []string `json:"sourceFiles"`
StorageService *StorageService `json:"storageService"`
MessageChannel chan *EventMessage `json:"-"`
ExpandedSourceList []string `json:"expandedSourceList"`
}

func NewUploadOperation(ss *StorageService, files []string) *UploadOperation {
Expand Down Expand Up @@ -64,8 +65,9 @@ func (u *UploadOperation) Validate() bool {
}

func (u *UploadOperation) CalculatePayloadSize() error {
u.expandSourceFileList()
u.PayloadSize = 0
for _, fileOrDir := range u.SourceFiles {
for _, fileOrDir := range u.ExpandedSourceList {
stat, err := os.Stat(fileOrDir)
if err != nil {
Dart.Log.Errorf("UploadOperation.CalculatePayloadSize - can't stat %s: %v", fileOrDir, err)
Expand All @@ -90,6 +92,7 @@ func (u *UploadOperation) CalculatePayloadSize() error {
}

func (u *UploadOperation) DoUpload(messageChannel chan *EventMessage) bool {
u.expandSourceFileList()
ok := false
switch u.StorageService.Protocol {
case constants.ProtocolS3:
Expand Down Expand Up @@ -124,7 +127,7 @@ func (u *UploadOperation) sendToS3(messageChannel chan *EventMessage) bool {
return false
}
allSucceeded := true
for _, sourceFile := range u.SourceFiles {
for _, sourceFile := range u.ExpandedSourceList {
s3Key := filepath.Base(sourceFile)
u.Result.RemoteURL = u.StorageService.URL(s3Key)
Dart.Log.Infof("Starting S3 upload %s to %s", sourceFile, u.Result.RemoteURL)
Expand Down Expand Up @@ -160,7 +163,7 @@ func (u *UploadOperation) sendToS3(messageChannel chan *EventMessage) bool {
// no progress updates.
func (u *UploadOperation) sendToSFTP(messageChannel chan *EventMessage) bool {
allSucceeded := true
for _, file := range u.SourceFiles {
for _, file := range u.ExpandedSourceList {
var progress *StreamProgress
if messageChannel != nil {
progress = NewStreamProgress(u.PayloadSize, messageChannel)
Expand All @@ -179,3 +182,47 @@ func (u *UploadOperation) useSSL() bool {
Dart.Log.Infof("Use SSL for upload = %t", useSSL)
return useSSL
}

func (u *UploadOperation) expandSourceFileList() error {

// START HERE

// TODO: Set a flag describing whether this has already been done,
// so we don't do it over and over. Also, consider using util.RecursiveFileList,
// since we're going to have to stat everything anyway.
//
// Also note that when uploading loose files to S3, they all go into
// the top-level dir. They should instead mirror the local struction.
//
// SFTP uploads seem to create the necessary directories, but
// they're created as files, and then
// all of the files go into the top-level directory. Fix that.
//
// Also, the progress bar for SFTP multifile uploads is schizo.
// It's not calculating progress against the total upload size.

u.ExpandedSourceList = make([]string, 0)
for _, filePath := range u.SourceFiles {
if util.IsDirectory(filePath) {
filesInSource, err := listDirRecursive(filePath)
if err != nil {
return err
}
u.ExpandedSourceList = append(u.ExpandedSourceList, filesInSource...)
} else {
u.ExpandedSourceList = append(u.ExpandedSourceList, filePath)
}
}
return nil
}

func listDirRecursive(dir string) ([]string, error) {
files := make([]string, 0)
err := filepath.Walk(dir, func(filePath string, f os.FileInfo, err error) error {
if f.Mode().IsRegular() || f.IsDir() {
files = append(files, filePath)
}
return nil
})
return files, err
}
3 changes: 3 additions & 0 deletions scripts/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def start_sftp
-p 2222:22 -d #{sftp_image_name}`
if $?.exitstatus == 0
puts "Started SFTP server with id #{@docker_sftp_id}"
puts "To log in and view the contents, use "
puts "sftp -P 2222 pw_user@localhost"
puts "The password is 'password' without the quotes"
@sftp_started = true
else
puts "Error starting SFTP docker container. Is one already running?"
Expand Down
24 changes: 11 additions & 13 deletions server/controllers/jobs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,6 @@ func JobIndex(c *gin.Context) {
return
}
request.TemplateData["jobs"] = request.QueryResult.Jobs
// items := make([]JobListItem, len(jobs))
// for i, job := range jobs {
// artifacts, err := core.ArtifactNameIDList(job.ID)
// if err != nil {
// core.Dart.Log.Warningf("Error getting artifact list for job %s: %v", job.Name(), err)
// }
// item := JobListItem{
// Job: job,
// Artifacts: artifacts,
// }
// items[i] = item
// }
// request.TemplateData["items"] = items
c.HTML(http.StatusOK, "job/list.html", request.TemplateData)
}

Expand All @@ -71,3 +58,14 @@ func JobNew(c *gin.Context) {
}
c.Redirect(http.StatusFound, fmt.Sprintf("/jobs/files/%s", job.ID))
}

// GET /jobs/show_json/:id
func JobShowJson(c *gin.Context) {
jobID := c.Param("id")
result := core.ObjFind(jobID)
if result.Error != nil {
AbortWithErrorJSON(c, http.StatusInternalServerError, result.Error)
return
}
c.JSON(http.StatusOK, result.Job())
}
13 changes: 13 additions & 0 deletions server/controllers/jobs_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers_test

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -79,3 +80,15 @@ func TestJobDelete(t *testing.T) {
require.NotNil(t, result.Error)
assert.Equal(t, "sql: no rows in result set", result.Error.Error())
}

func TestJobShowJson(t *testing.T) {
defer core.ClearDartTable()
job := loadTestJob(t)
require.NoError(t, core.ObjSave(job))

data, err := json.Marshal(job)
require.Nil(t, err)
expected := []string{string(data)}

DoSimpleGetTest(t, fmt.Sprintf("/jobs/show_json/%s", job.ID), expected)
}
2 changes: 1 addition & 1 deletion server/controllers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Request struct {

// Regex to help us extract object type from handler name.
// E.g. AppSettingIndex -> AppSetting, StorageServiceEdit -> StorageService.
var routeSuffix = regexp.MustCompile(`Index|New|Save|Edit|Delete|GetReport$`)
var routeSuffix = regexp.MustCompile(`Index|New|Save|Edit|Delete|GetReport|ShowJson$`)

func NewRequest(c *gin.Context) *Request {
pathAndQuery := c.Request.URL.Path
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func initRoutes(router *gin.Engine) {
router.POST("/jobs/delete_file/:id", controllers.JobDeleteFile)
router.GET("/jobs/summary/:id", controllers.JobRunShow)
router.GET("/jobs/run/:id", controllers.JobRunExecute)
router.GET("/jobs/show_json/:id", controllers.JobShowJson)

// Job Artifacts
router.GET("/jobs/artifacts/list/:job_id", controllers.JobArtifactsList)
Expand Down

0 comments on commit 9d7683c

Please sign in to comment.