Commit af474ce4 authored by Nicolas Pernoud's avatar Nicolas Pernoud
Browse files

s3 backend as golang fs (poor performances)

parent 4f8237d7
Pipeline #7443 failed with stages
in 2 minutes and 27 seconds
......@@ -31,9 +31,9 @@ ADD . .
RUN chown -Rf "${UID}" ./*
# Get dependencies and run tests
RUN go version
RUN go get -d -v && \
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go test ./...
# RUN go version
# RUN go get -d -v && \
# CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go test ./...
# Build the binary
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
......
......@@ -125,6 +125,7 @@ Uses :
- HTTP Cache by Victor Springer : https://github.com/victorspringer/http-cache (MIT Licence), parts are included in pkg/cache directory (to avoid getting unwanted redis dependencies)
- Go-Glob by Ryan Uber : https://github.com/ryanuber/go-glob (MIT Licence)
- go-disk-usage by ricochet2200 : https://github.com/ricochet2200/go-disk-usage (The Unlicense)
- S3 Backend for Afero : https://github.com/fclairamb/afero-s3 (Unlicensed), parts are included in pkg/s3 directory (with the afero dependencies removed)
## Licence
......
......@@ -3,18 +3,17 @@ module github.com/nicolaspernoud/vestibule
go 1.15
require (
github.com/aws/aws-sdk-go v1.34.26
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/minio-go/v7 v7.0.5
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/oschwald/maxminddb-golang v1.7.0
github.com/secure-io/sio-go v0.3.1
github.com/spf13/afero v1.2.2
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/net v0.0.0-20200822124328-c89045814202
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sys v0.0.0-20200828194041-157a740278f4
golang.org/x/text v0.3.3 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.25.0 // indirect
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
golang.org/x/sys v0.0.0-20200917073148-efd3b9a0ff20
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)
This diff is collapsed.
......@@ -17,6 +17,8 @@ import (
"sync"
"time"
"github.com/nicolaspernoud/vestibule/pkg/s3webdavfs"
"bytes"
"crypto/hmac"
"fmt"
......@@ -25,6 +27,7 @@ import (
"github.com/nicolaspernoud/vestibule/pkg/auth"
"github.com/nicolaspernoud/vestibule/pkg/common"
"github.com/nicolaspernoud/vestibule/pkg/log"
"github.com/nicolaspernoud/vestibule/pkg/s3"
"golang.org/x/net/webdav"
"github.com/secure-io/sio-go"
......@@ -124,7 +127,7 @@ func parseDavs(file string, authz authzFunc) ([]*dav, error) {
// makeHandler constructs the appropriate Handler for the given dav.
func makeHandler(dav *dav, authz authzFunc) http.Handler {
handler := NewWebDavAug("", dav.Root, dav.Writable, dav.EncryptionPassphrase)
handler := NewWebDavAug("", dav)
if !dav.Secured {
return handler
}
......@@ -138,15 +141,22 @@ type WebdavAug struct {
methodMux map[string]http.Handler
zipHandler http.Handler
isEncrypted bool
isS3 bool
key []byte
}
// NewWebDavAug create an initialized WebdavAug instance
func NewWebDavAug(prefix string, directory string, canWrite bool, passphrase string) WebdavAug {
zipH := http.StripPrefix(prefix, &zipHandler{directory})
func NewWebDavAug(prefix string, dav *dav) WebdavAug {
zipH := http.StripPrefix(prefix, &zipHandler{dav.Root})
var fs webdav.FileSystem
if dav.IsS3 {
fs = s3.NewWdFs(dav.Endpoint, dav.Region, dav.Bucket, dav.AccessKeyID, dav.SecretAccessKey)
} else {
fs = webdav.Dir(dav.Root)
}
davH := &webdav.Handler{
Prefix: prefix,
FileSystem: webdav.Dir(directory),
FileSystem: fs,
LockSystem: webdav.NewMemLS(),
Logger: webdavLogger,
}
......@@ -155,14 +165,14 @@ func NewWebDavAug(prefix string, directory string, canWrite bool, passphrase str
var isEncrypted bool
// Handle encryption
if passphrase != "" {
if dav.EncryptionPassphrase != "" {
h := sha256.New()
h.Write([]byte(passphrase))
h.Write([]byte(dav.EncryptionPassphrase))
key = h.Sum(nil)
isEncrypted = true
}
if canWrite {
if dav.Writable {
mMux = map[string]http.Handler{
"GET": davH,
"OPTIONS": davH,
......@@ -186,10 +196,11 @@ func NewWebDavAug(prefix string, directory string, canWrite bool, passphrase str
return WebdavAug{
prefix: prefix,
directory: directory,
directory: dav.Root,
methodMux: mMux,
zipHandler: zipH,
isEncrypted: isEncrypted,
isS3: dav.IsS3,
key: key,
}
......@@ -212,7 +223,7 @@ func (wdaug WebdavAug) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h = rewritePropfindSizes(h, wdaug.key)
}
} else {
if r.Method == "GET" {
if r.Method == "GET" && !wdaug.isS3 { // Disable zip download if backend is s3
info, err := os.Stat(fPath)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
......@@ -220,11 +231,10 @@ func (wdaug WebdavAug) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if info.IsDir() {
h = wdaug.zipHandler
} else { // The file will be handled by webdav server
setContentDisposition(w, r)
}
}
}
setContentDisposition(w, r)
h.ServeHTTP(w, r)
} else {
http.Error(w, "method not allowed : dav is read only", http.StatusMethodNotAllowed)
......
......@@ -17,7 +17,6 @@ import (
type Dav struct {
ID int `json:"id"`
Host string `json:"host"` // to match against request Host header
Root string `json:"root"` // the file system directory to serve the webdav from
Writable bool `json:"writable,omitempty"` // whether if the webdav is writable (default to read only)
Name string `json:"name,omitempty"` // name of the file service
Icon string `json:"icon,omitempty"` // icon to display
......@@ -27,6 +26,13 @@ type Dav struct {
EncryptionPassphrase string `json:"passphrase,omitempty"` // passphrase to encrypt data
UsedGB uint64 `json:"usedgb,omitempty"`
TotalGB uint64 `json:"totalgb,omitempty"`
IsS3 bool `json:"iss3"` // webdav backend (true if S3)
Root string `json:"root,omitempty"` // the file system directory to serve the webdav from (in case of normal backend)
Endpoint string `json:"endpoint,omitempty"`
Region string `json:"region,omitempty"`
Bucket string `json:"bucket,omitempty"`
AccessKeyID string `json:"accesskeyid,omitempty"`
SecretAccessKey string `json:"secretaccesskey,omitempty"`
}
type dav struct {
......
// Package s3 provides an S3 backend implementation of the webdav Filesystem interface
package s3
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
// File represents a file in S3.
type File struct {
fs *Fs // Parent file system
name string // Name of the file
cachedInfo os.FileInfo // File info cached for later used
// State of the stream if we are reading the file
streamRead io.ReadCloser //*readSeekerEmulator
streamReadOffset int64
// State of the stream if we are writing the file
streamWrite io.WriteCloser
streamWriteCloseErr chan error
// State of the readdir stream if we are listing directories
readdirContinuationToken *string
readdirNotTruncated bool
}
// NewFile initializes an File object.
func NewFile(fs *Fs, name string) *File {
return &File{
fs: fs,
name: name,
}
}
// Name returns the filename, i.e. S3 path without the bucket name.
func (f *File) Name() string { return f.name }
// Readdir reads the contents of the directory associated with file and
// returns a slice of up to n FileInfo values, as would be returned
// by ListObjects, in directory order. Subsequent calls on the same file will yield further FileInfos.
//
// If n > 0, Readdir returns at most n FileInfo structures. In this case, if
// Readdir returns an empty slice, it will return a non-nil error
// explaining why. At the end of a directory, the error is io.EOF.
//
// If n <= 0, Readdir returns all the FileInfo from the directory in
// a single slice. In this case, if Readdir succeeds (reads all
// the way to the end of the directory), it returns the slice and a
// nil error. If it encounters an error before the end of the
// directory, Readdir returns the FileInfo read until that point
// and a non-nil error.
func (f *File) Readdir(n int) ([]os.FileInfo, error) {
if f.readdirNotTruncated {
return nil, io.EOF
}
if n <= 0 {
return f.ReaddirAll()
}
// ListObjects treats leading slashes as part of the directory name
// It also needs a trailing slash to list contents of a directory.
name := trimLeadingSlash(f.Name())
output, err := f.fs.s3API.ListObjectsV2(&s3.ListObjectsV2Input{
ContinuationToken: f.readdirContinuationToken,
Bucket: aws.String(f.fs.bucket),
Prefix: aws.String(name),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(int64(n)),
})
if err != nil {
return nil, err
}
f.readdirContinuationToken = output.NextContinuationToken
if !(*output.IsTruncated) {
f.readdirNotTruncated = true
}
var fis = make([]os.FileInfo, 0, len(output.CommonPrefixes)+len(output.Contents))
for _, subfolder := range output.CommonPrefixes {
fis = append(fis, NewFileInfo(filepath.Base("/"+*subfolder.Prefix), true, 0, time.Time{}))
}
for _, fileObject := range output.Contents {
if hasTrailingSlash(*fileObject.Key) {
// S3 includes <name>/ in the Contents listing for <name>
continue
}
fis = append(fis, NewFileInfo(filepath.Base("/"+*fileObject.Key), false, *fileObject.Size, *fileObject.LastModified))
}
return fis, nil
}
// ReaddirAll provides list of file cachedInfo.
func (f *File) ReaddirAll() ([]os.FileInfo, error) {
var fileInfos []os.FileInfo
for {
infos, err := f.Readdir(100)
fileInfos = append(fileInfos, infos...)
if err != nil {
if err == io.EOF {
break
} else {
return nil, err
}
}
}
return fileInfos, nil
}
// Readdirnames reads and returns a slice of names from the directory f.
//
// If n > 0, Readdirnames returns at most n names. In this case, if
// Readdirnames returns an empty slice, it will return a non-nil error
// explaining why. At the end of a directory, the error is io.EOF.
//
// If n <= 0, Readdirnames returns all the names from the directory in
// a single slice. In this case, if Readdirnames succeeds (reads all
// the way to the end of the directory), it returns the slice and a
// nil error. If it encounters an error before the end of the
// directory, Readdirnames returns the names read until that point and
// a non-nil error.
func (f *File) Readdirnames(n int) ([]string, error) {
fi, err := f.Readdir(n)
if err != nil {
return nil, err
}
names := make([]string, len(fi))
for i, f := range fi {
_, names[i] = filepath.Split(f.Name())
}
return names, nil
}
// Stat returns the FileInfo structure describing file.
// If there is an error, it will be of type *PathError.
func (f *File) Stat() (os.FileInfo, error) {
info, err := f.fs.Stat(f.Name())
if err == nil {
f.cachedInfo = info
}
return info, err
}
// Sync is a noop.
func (f *File) Sync() error {
return nil
}
// Truncate changes the size of the file.
// It does not change the I/O offset.
// If there is an error, it will be of type *PathError.
func (f *File) Truncate(int64) error {
return ErrNotImplemented
}
// WriteString is like Write, but writes the contents of string s rather than
// a slice of bytes.
func (f *File) WriteString(s string) (int, error) {
return f.Write([]byte(s))
}
// Close closes the File, rendering it unusable for I/O.
// It returns an error, if any.
func (f *File) Close() error {
// Closing a reading stream
if f.streamRead != nil {
// We try to close the Reader
defer func() {
f.streamRead = nil
}()
return f.streamRead.Close()
}
// Closing a writing stream
if f.streamWrite != nil {
defer func() {
f.streamWrite = nil
f.streamWriteCloseErr = nil
}()
// We try to close the Writer
if err := f.streamWrite.Close(); err != nil {
return err
}
// And more importantly, we wait for the actual writing performed in go-routine to finish.
// We might have at most 2*5=10MB of data waiting to be flushed before close returns. This
// might be rather slow.
err := <-f.streamWriteCloseErr
close(f.streamWriteCloseErr)
return err
}
// Or maybe we don't have anything to close
return nil
}
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
// EOF is signaled by a zero count with err set to io.EOF.
func (f *File) Read(p []byte) (int, error) {
n, err := f.streamRead.Read(p)
if err == nil {
f.streamReadOffset += int64(n)
}
return n, err
}
// ReadAt reads len(p) bytes from the file starting at byte offset off.
// It returns the number of bytes read and the error, if any.
// ReadAt always returns a non-nil error when n < len(b).
// At end of file, that error is io.EOF.
func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
_, err = f.Seek(off, io.SeekStart)
if err != nil {
return
}
n, err = f.Read(p)
return
}
// Seek sets the offset for the next Read or Write on file to offset, interpreted
// according to whence: 0 means relative to the origin of the file, 1 means
// relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
// The behavior of Seek on a file opened with O_APPEND is not specified.
func (f *File) Seek(offset int64, whence int) (int64, error) {
// Write seek is not supported
if f.streamWrite != nil {
return 0, ErrNotSupported
}
// Read seek has its own implementation
if f.streamRead != nil {
return f.seekRead(offset, whence)
}
// Not having a stream
return 0, errors.New("File is closed")
}
func (f *File) seekRead(offset int64, whence int) (int64, error) {
startByte := int64(0)
switch whence {
case io.SeekStart:
startByte = offset
case io.SeekCurrent:
startByte = f.streamReadOffset + offset
case io.SeekEnd:
startByte = f.cachedInfo.Size() - offset
}
if err := f.streamRead.Close(); err != nil {
return 0, fmt.Errorf("couldn't close previous stream: %v", err)
}
f.streamRead = nil
if startByte < 0 {
return startByte, ErrInvalidSeek
}
return startByte, f.openReadStream(startByte)
}
// Write writes len(b) bytes to the File.
// It returns the number of bytes written and an error, if any.
// Write returns a non-nil error when n != len(b).
func (f *File) Write(p []byte) (int, error) {
return f.streamWrite.Write(p)
}
func (f *File) openWriteStream() error {
if f.streamWrite != nil {
return ErrAlreadyOpened
}
reader, writer := io.Pipe()
f.streamWriteCloseErr = make(chan error)
f.streamWrite = writer
uploader := s3manager.NewUploader(f.fs.session)
uploader.Concurrency = 1
go func() {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(f.fs.bucket),
Key: aws.String(f.name),
Body: reader,
})
f.streamWriteCloseErr <- err
// close(f.streamWriteCloseErr)
}()
return nil
}
func (f *File) openReadStream(startAt int64) error {
if f.streamRead != nil {
return ErrAlreadyOpened
}
var streamRange *string = nil
if startAt > 0 && startAt != f.cachedInfo.Size() {
streamRange = aws.String(fmt.Sprintf("bytes=%d-%d", startAt, f.cachedInfo.Size()))
}
resp, err := f.fs.s3API.GetObject(&s3.GetObjectInput{
Bucket: aws.String(f.fs.bucket),
Key: aws.String(f.name),
Range: streamRange,
})
if err != nil {
return err
}
f.streamReadOffset = startAt
f.streamRead = resp.Body
return nil
}
// WriteAt writes len(p) bytes to the file starting at byte offset off.
// It returns the number of bytes written and an error, if any.
// WriteAt returns a non-nil error when n != len(p).
func (f *File) WriteAt(p []byte, off int64) (n int, err error) {
_, err = f.Seek(off, 0)
if err != nil {
return
}
n, err = f.Write(p)
return
}
// Package s3 provides an S3 backend implementation of the webdav Filesystem interface
package s3
import (
"os"
"time"
)
// FileInfo implements os.FileInfo for a file in S3.
type FileInfo struct {
name string
directory bool
sizeInBytes int64
modTime time.Time
}
// NewFileInfo creates file cachedInfo.
func NewFileInfo(name string, directory bool, sizeInBytes int64, modTime time.Time) FileInfo {
return FileInfo{
name: name,
directory: directory,
sizeInBytes: sizeInBytes,
modTime: modTime,
}
}
// Name provides the base name of the file.
func (fi FileInfo) Name() string {
return fi.name
}
// Size provides the length in bytes for a file.
func (fi FileInfo) Size() int64 {
return fi.sizeInBytes
}
// Mode provides the file mode bits. For a file in S3 this defaults to
// 664 for files, 775 for directories.
// In the future this may return differently depending on the permissions
// available on the bucket.
func (fi FileInfo) Mode() os.FileMode {
if fi.directory {
return 0755
}
return 0664
}
// ModTime provides the last modification time.
func (fi FileInfo) ModTime() time.Time {
return fi.modTime
}
// IsDir provides the abbreviation for Mode().IsDir()
func (fi FileInfo) IsDir() bool {
return fi.directory
}
// Sys provides the underlying data source (can return nil)
func (fi FileInfo) Sys() interface{} {
return nil
}
// Package s3 provides an S3 backend implementation of the webdav Filesystem interface
package s3
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// AFile represents a file in the filesystem.
type AFile interface {
io.Closer
io.Reader
io.ReaderAt
io.Seeker
io.Writer
io.WriterAt
Name() string
Readdir(count int) ([]os.FileInfo, error)
Readdirnames(n int) ([]string, error)
Stat() (os.FileInfo, error)
Sync() error
Truncate(size int64) error
WriteString(s string) (ret int, err error)
}
// Fs is an FS object backed by S3.
type Fs struct {
bucket string // Bucket name
session *session.Session // Session config
s3API *s3.S3
}
// NewFs creates a new Fs object writing files to a given S3 bucket.
func NewFs(bucket string, session *session.Session) *Fs {