Skip to content

Commit

Permalink
- add crc32c headers for 3.15
Browse files Browse the repository at this point in the history
- fix #327
  • Loading branch information
ddvk committed Nov 15, 2024
1 parent 87d79db commit 870c93a
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 54 deletions.
3 changes: 2 additions & 1 deletion dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export RM_SMTP_NOTLS=1
export RM_SMTP_NOAUTH=1
export JWT_SECRET_KEY=dev
export LOGLEVEL=${1:-DEBUG}
export STORAGE_URL=http://$(hostname):3000
#export STORAGE_URL=http://$(hostname):3000
#export STORAGE_URL=http://pc.lan:3000
make runui &
PID=$!
trap "kill $PID ||:" EXIT
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ddvk/rmfakecloud

go 1.17
go 1.23

require (
github.com/dropbox/dropbox-sdk-go-unofficial/v6 v6.0.5
Expand Down
108 changes: 85 additions & 23 deletions internal/app/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func (app *App) syncUpdateRootV3(c *gin.Context) {
}

uid := c.GetString(userIDKey)
newgeneration, err := app.blobStorer.StoreBlob(uid, "root", bytes.NewBufferString(rootv3.Hash), rootv3.Generation)
newgeneration, err := app.blobStorer.StoreBlob(uid, RootHash, bytes.NewBufferString(rootv3.Hash), rootv3.Generation)
if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
Expand All @@ -759,24 +759,44 @@ func (app *App) syncUpdateRootV3(c *gin.Context) {
}

c.JSON(http.StatusOK, messages.SyncRootV3Response{
Generation: newgeneration,
Hash: rootv3.Hash,
SchemaVersion: SchemaVersion,
Generation: newgeneration,
Hash: rootv3.Hash,
})
}

const SchemaVersion = 3

func (app *App) syncGetRoot(c *gin.Context, newAccount func(*gin.Context)) {
uid := c.GetString(userIDKey)
const RmTokenTtlHeader = "Rm-Token-Ttl-Hint"
const RmFileHeader = "rm-filename"

const RootHash = "root"

reader, generation, _, err := app.blobStorer.LoadBlob(uid, "root")
// crcJSON calculates and ands the crc32c header
// TODO: fix it with a custom render or something
func crcJSON(c *gin.Context, status int, msg any) {
b, err := json.Marshal(msg)
if err != nil {
panic(err)
}

crc, err := common.CRC32FromReader(bytes.NewBuffer(b))
if err != nil {
panic(err)
}
common.AddCRCHeader(c, crc)
c.Data(status, "application/json", b)
}

func (app *App) syncGetRootV3(c *gin.Context) {
uid := c.GetString(userIDKey)
reader, generation, _, _, err := app.blobStorer.LoadBlob(uid, RootHash)
if err == fs.ErrorNotFound {
log.Warn("No root file found, assuming this is a new account")
newAccount(c)
c.JSON(http.StatusNotFound, gin.H{"message": "root not found"})
return
} else if err != nil {
}

if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
return
Expand All @@ -790,23 +810,38 @@ func (app *App) syncGetRoot(c *gin.Context, newAccount func(*gin.Context)) {
}

c.JSON(http.StatusOK, messages.SyncRootV3Response{
Generation: generation,
Hash: string(roothash),
SchemaVersion: SchemaVersion,
})
}

func (app *App) syncGetRootV3(c *gin.Context) {
app.syncGetRoot(c, func(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"message": "root not found"})
Generation: generation,
Hash: string(roothash),
})
}

func (app *App) syncGetRootV4(c *gin.Context) {
app.syncGetRoot(c, func(c *gin.Context) {
c.JSON(http.StatusOK, messages.SyncRootV3Response{
Generation: 0,
uid := c.GetString(userIDKey)
reader, generation, _, _, err := app.blobStorer.LoadBlob(uid, RootHash)
if err == fs.ErrorNotFound {
log.Warn("No root file found, assuming this is a new account")
crcJSON(c, http.StatusOK, messages.SyncRootV4Response{
SchemaVersion: SchemaVersion,
})
return
}

if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
return
}

roothash, err := io.ReadAll(reader)
if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
return
}
crcJSON(c, http.StatusOK, messages.SyncRootV4Response{
Generation: generation,
Hash: string(roothash),
SchemaVersion: SchemaVersion,
})
}

Expand Down Expand Up @@ -843,13 +878,14 @@ func (app *App) blobStorageRead(c *gin.Context) {
uid := c.GetString(userIDKey)
blobID := common.ParamS(fileKey, c)

reader, _, size, err := app.blobStorer.LoadBlob(uid, blobID)
reader, _, size, crc32c, err := app.blobStorer.LoadBlob(uid, blobID)
if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
return
}
defer reader.Close()
common.AddCRCHeader(c, crc32c)

c.DataFromReader(http.StatusOK, size, "application/octet-stream", reader, nil)
}
Expand All @@ -858,14 +894,19 @@ func (app *App) blobStorageWrite(c *gin.Context) {
uid := c.GetString(userIDKey)
blobID := common.ParamS(fileKey, c)

fileName := c.GetHeader(RmFileHeader)
hash := c.GetHeader(common.CRC32CHashHeader)
log.Debugf("TODO: check/save etc. write file '%s', hash '%s'", fileName, hash)

newgeneration, err := app.blobStorer.StoreBlob(uid, blobID, c.Request.Body, 0)
if err != nil {
log.Error(err)
c.AbortWithStatus(http.StatusInternalServerError)
return
}

c.JSON(http.StatusOK, messages.SyncRootV3Response{
//not checked by the client yet, but who knows
crcJSON(c, http.StatusOK, messages.SyncRootV4Response{
Generation: newgeneration,
Hash: string(blobID),
SchemaVersion: SchemaVersion,
Expand Down Expand Up @@ -1040,6 +1081,27 @@ func (app *App) connectWebSocket(c *gin.Context) {
go app.hub.ConnectWs(uid, deviceID, connection)
}

// syncReports reports sync errors back
func (app *App) syncReports(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)

if err != nil {
log.Warn("cant parse sync report, ignored")
c.Status(http.StatusOK)
return
}
log.Infof("got sync report: %s", string(body))
c.Status(http.StatusOK)
}

func (app *App) nullReport(c *gin.Context) {
_, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Warn("could not read report data")
}
c.Status(http.StatusOK)
}

// / remove remarkable ads
func stripAds(msg string) string {
br := "<br>--<br>"
Expand Down
28 changes: 8 additions & 20 deletions internal/app/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (app *App) registerRoutes(router *gin.Engine) {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"err": err.Error()})
return
}
log.Infof("endpoint %s", endpoint)
c.JSON(http.StatusOK, gin.H{
"Status": "OK",
"Host": endpoint,
Expand Down Expand Up @@ -80,27 +81,11 @@ func (app *App) registerRoutes(router *gin.Engine) {
c.Status(http.StatusOK)
})

router.POST("/analytics/v2/events", app.nullReport)
//some telemetry stuff from ping.
router.POST("/v1/reports", func(c *gin.Context) {
_, err := io.ReadAll(c.Request.Body)

if err != nil {
log.Warn("cant parse telemetry, ignored")
c.Status(http.StatusOK)
return
}
c.Status(http.StatusOK)
})
router.POST("/v2/reports", func(c *gin.Context) {
_, err := io.ReadAll(c.Request.Body)

if err != nil {
log.Warn("cant parse telemetry, ignored")
c.Status(http.StatusOK)
return
}
c.Status(http.StatusOK)
})
router.POST("/v1/reports", app.nullReport)
router.POST("/v2/reports", app.nullReport)
router.POST("/report/v1", app.nullReport)

//routes needing api authentitcation
authRoutes := router.Group("/")
Expand Down Expand Up @@ -159,5 +144,8 @@ func (app *App) registerRoutes(router *gin.Engine) {
authRoutes.GET("/sync/v3/missing", app.checkMissingBlob)

authRoutes.GET("/sync/v4/root", app.syncGetRootV4)

// reports
authRoutes.POST("/sync/reports/v1", app.syncReports)
}
}
33 changes: 33 additions & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package common

import (
"encoding/base64"
"encoding/binary"
"errors"
"hash/crc32"
"io"
"regexp"
"strings"

Expand Down Expand Up @@ -62,3 +66,32 @@ func ParamS(param string, c *gin.Context) string {
p := c.Param(param)
return Sanitize(p)
}

var table = crc32.MakeTable(crc32.Castagnoli)

func CRC32FromReader(reader io.Reader) (string, error) {
// Create a table for CRC32C (Castagnoli polynomial)
// Create a CRC32C hasher
crc32c := crc32.New(table)

// Copy the reader data into the hasher
if _, err := io.Copy(crc32c, reader); err != nil {
return "", err
}

// Compute the CRC32C checksum
checksum := crc32c.Sum32()

// Convert the checksum to a byte array
crcBytes := make([]byte, 4)
binary.BigEndian.PutUint32(crcBytes, checksum)
encodedChecksum := base64.StdEncoding.EncodeToString(crcBytes)

return encodedChecksum, nil
}

const CRC32CHashHeader = "x-goog-hash"

func AddCRCHeader(c *gin.Context, crc string) {
c.Header(CRC32CHashHeader, "crc32c="+crc)
}
6 changes: 6 additions & 0 deletions internal/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ type SyncRootV3Request struct {

// SyncRootV3Response
type SyncRootV3Response struct {
Generation int64 `json:"generation"`
Hash string `json:"hash,omitempty"`
}

// SyncRootV4Response
type SyncRootV4Response struct {
Generation int64 `json:"generation"`
Hash string `json:"hash,omitempty"`
SchemaVersion int64 `json:"schemaVersion"`
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/fs/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (app *App) downloadBlob(c *gin.Context) {

log.Info("Requestng blob: ", blobID)

reader, generation, size, err := app.fs.LoadBlob(uid, blobID)
reader, generation, size, crc32c, err := app.fs.LoadBlob(uid, blobID)
if err != nil {
if err == ErrorNotFound {
c.AbortWithStatus(http.StatusNotFound)
Expand All @@ -163,6 +163,8 @@ func (app *App) downloadBlob(c *gin.Context) {
}
defer reader.Close()

common.AddCRCHeader(c, crc32c)

if blobID == rootBlob {
log.Debug("Sending gen for root: ", generation)
c.Header(generationHeader, strconv.FormatInt(generation, 10))
Expand Down
22 changes: 17 additions & 5 deletions internal/storage/fs/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (fs *FileSystemStorage) GetBlobURL(uid, blobid string, write bool) (docurl
}

// LoadBlob Opens a blob by id
func (fs *FileSystemStorage) LoadBlob(uid, blobid string) (reader io.ReadCloser, gen int64, size int64, err error) {
func (fs *FileSystemStorage) LoadBlob(uid, blobid string) (reader io.ReadCloser, gen int64, size int64, crc32 string, err error) {
generation := int64(0)
blobPath := path.Join(fs.getUserBlobPath(uid), common.Sanitize(blobid))
log.Debugln("Fullpath:", blobPath)
Expand All @@ -464,7 +464,7 @@ func (fs *FileSystemStorage) LoadBlob(uid, blobid string) (reader io.ReadCloser,
err := lock.LockWithTimeout(time.Duration(time.Second * 5))
if err != nil {
log.Error("cannot obtain lock")
return nil, 0, 0, err
return nil, 0, 0, "", err
}
defer lock.Unlock()

Expand All @@ -476,11 +476,23 @@ func (fs *FileSystemStorage) LoadBlob(uid, blobid string) (reader io.ReadCloser,

fi, err := os.Stat(blobPath)
if err != nil || fi.IsDir() {
return nil, generation, 0, ErrorNotFound
return nil, generation, 0, "", ErrorNotFound
}

reader, err = os.Open(blobPath)
return reader, generation, fi.Size(), err
osFile, err := os.Open(blobPath)
//TODO: cache the crc32
crc32, err = common.CRC32FromReader(osFile)
if err != nil {
log.Errorf("cannot get crc32 hash %v", err)
return
}
_, err = osFile.Seek(0, 0)
if err != nil {
log.Errorf("cannot rewind file %v", err)
return
}
reader = osFile
return reader, generation, fi.Size(), crc32, err
}

// StoreBlob stores a document
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/fs/localblobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type LocalBlobStorage struct {

// GetRootIndex the hash of the root index
func (p *LocalBlobStorage) GetRootIndex() (hash string, gen int64, err error) {
r, gen, _, err := p.fs.LoadBlob(p.uid, rootBlob)
r, gen, _, _, err := p.fs.LoadBlob(p.uid, rootBlob)
if err == ErrorNotFound {
log.Info("root not found")
return "", gen, nil
Expand All @@ -40,7 +40,7 @@ func (p *LocalBlobStorage) WriteRootIndex(generation int64, roothash string) (in

// GetReader reader for a given hash
func (p *LocalBlobStorage) GetReader(hash string) (io.ReadCloser, error) {
r, _, _, err := p.fs.LoadBlob(p.uid, hash)
r, _, _, _, err := p.fs.LoadBlob(p.uid, hash)
return r, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BlobStorage interface {
GetBlobURL(uid, docid string, write bool) (string, time.Time, error)

StoreBlob(uid, blobID string, s io.Reader, matchGeneration int64) (int64, error)
LoadBlob(uid, blobID string) (reader io.ReadCloser, gen int64, size int64, err error)
LoadBlob(uid, blobID string) (reader io.ReadCloser, gen int64, size int64, crc32c string, err error)
CreateBlobDocument(uid, name, parent string, stream io.Reader) (doc *Document, err error)
}

Expand Down

0 comments on commit 870c93a

Please sign in to comment.