remove prefix, add remote storage

This commit is contained in:
2025-04-27 01:49:21 +03:00
parent 8398869b01
commit e777e25d87
10 changed files with 759 additions and 37 deletions
+23
View File
@@ -0,0 +1,23 @@
package remote
import (
"errors"
"strings"
)
var (
ErrNoScheme = errors.New("no scheme")
ErrEmptyURL = errors.New("URL cannot be empty")
)
// schemeFromURL returns the scheme from a URL string
func schemeFromURL(url string) (string, error) {
if url == "" {
return "", ErrEmptyURL
}
i := strings.Index(url, ":")
if i < 1 {
return "", ErrNoScheme
}
return url[:i], nil
}
+299
View File
@@ -0,0 +1,299 @@
package miniostorage
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"sort"
"syscall"
"github.com/minio/minio-go/v7"
)
var (
ErrOutOfRange = errors.New("out of range")
ErrNotSupported = errors.New("doesn't support this operation")
)
var _ http.File = (*MinioFile)(nil)
type MinioFile struct {
openFlags int
offset int64
closed bool
resource *minioFileResource
}
// NewMinioFile
func NewMinioFile(ctx context.Context, storage *MinioStorage, openFlags int, fileMode os.FileMode, name string) *MinioFile {
return &MinioFile{
openFlags: openFlags,
// offset: 0,
// closed: false,
resource: &minioFileResource{
ctx: ctx,
storage: storage,
name: name,
fileMode: fileMode,
currentIoSize: 0,
offset: 0,
reader: nil,
writer: nil,
},
}
}
// Close
func (f *MinioFile) Close() error {
if f.closed {
return os.ErrClosed
}
f.closed = true
return f.resource.Close()
}
// Read
func (f *MinioFile) Read(p []byte) (int, error) {
if f.closed {
return 0, os.ErrClosed
}
readed, err := f.resource.ReadAt(p, f.offset)
f.offset += int64(readed)
return readed, err
}
// Seek
func (f *MinioFile) Seek(offset int64, whence int) (int64, error) {
if f.closed {
return 0, os.ErrClosed
}
// Since this is an expensive operation; let's make sure we need it
if (whence == 0 && offset == f.offset) || (whence == 1 && offset == 0) {
return f.offset, nil
}
// Fore the reader/writers to be reopened (at correct offset)
if err := f.Sync(); err != nil {
return 0, err
}
stat, err := f.Stat()
if err != nil {
return 0, nil
}
switch whence {
case io.SeekStart:
f.offset = offset
case io.SeekCurrent:
f.offset += offset
case io.SeekEnd:
f.offset = stat.Size() + offset
}
return f.offset, nil
}
// Write
func (f *MinioFile) Write(p []byte) (int, error) {
return f.WriteAt(p, f.offset)
}
// WriteAt
func (f *MinioFile) WriteAt(b []byte, off int64) (int, error) {
if f.closed {
return 0, os.ErrClosed
}
if f.openFlags&os.O_RDONLY != 0 {
return 0, fmt.Errorf("file is opened as read only")
}
written, err := f.resource.WriteAt(b, off)
f.offset += int64(written)
return written, err
}
// readdirImpl
func (f *MinioFile) readdirImpl(count int) ([]*MinioFileInfo, error) {
if err := f.Sync(); err != nil {
return nil, err
}
ownInfo, err := f.Stat()
if err != nil {
return nil, err
}
if !ownInfo.IsDir() {
return nil, syscall.ENOTDIR
}
var res []*MinioFileInfo
objs := f.resource.storage.client.ListObjects(f.resource.ctx, f.resource.storage.bucket, minio.ListObjectsOptions{
Recursive: true,
Prefix: f.resource.name,
})
for obj := range objs {
tmp := NewFileInfoFromAttrs(obj, f.resource.fileMode)
if tmp.Name() == "" {
// neither object.Name, not object.Prefix were present - so let's skip this unknown thing
continue
}
res = append(res, tmp)
}
if count > 0 && len(res) > 0 {
sort.Sort(ByName(res))
res = res[:count]
}
return res, nil
}
// Readdir
func (f *MinioFile) Readdir(count int) ([]fs.FileInfo, error) {
fi, err := f.readdirImpl(count)
if err != nil {
return nil, err
}
var res []fs.FileInfo
for _, v := range fi {
res = append(res, v)
}
return res, nil
}
// Readdirnames
func (f *MinioFile) Readdirnames(n int) ([]string, error) {
fi, err := f.Readdir(n)
if err != nil && err != io.EOF {
return nil, err
}
names := make([]string, len(fi))
for i, v := range fi {
names[i] = v.Name()
}
return names, err
}
// Stat
func (f *MinioFile) Stat() (os.FileInfo, error) {
if err := f.Sync(); err != nil {
return nil, err
}
stat, err := f.resource.storage.client.StatObject(f.resource.ctx, f.resource.storage.bucket, f.resource.name, minio.StatObjectOptions{})
if err != nil {
return nil, err
}
return NewFileInfoFromAttrs(stat, f.resource.fileMode), nil
}
// Sync
func (f *MinioFile) Sync() error {
return f.resource.maybeCloseIo()
}
// Truncate
func (f *MinioFile) Truncate(_ int64) error {
return ErrNotSupported
}
// WriteString
func (f *MinioFile) WriteString(s string) (int, error) {
return f.Write([]byte(s))
}
type readerAtCloser interface {
io.ReadCloser
io.ReaderAt
}
type minioFileResource struct {
ctx context.Context
storage *MinioStorage
name string
fileMode os.FileMode
currentIoSize int64
offset int64
reader readerAtCloser
writer io.WriteCloser
closed bool
}
// Close
func (r *minioFileResource) Close() error {
r.closed = true
return r.maybeCloseIo()
}
// maybeCloseIo
func (r *minioFileResource) maybeCloseIo() error {
if r.reader != nil {
if err := r.reader.Close(); err != nil {
return fmt.Errorf("error closing reader: %v", err)
}
r.reader = nil
}
if r.writer != nil {
if err := r.writer.Close(); err != nil {
return fmt.Errorf("error closing writer: %v", err)
}
r.writer = nil
}
return nil
}
// ReadAt
func (r *minioFileResource) ReadAt(p []byte, offset int64) (int, error) {
if cap(p) == 0 {
return 0, nil
}
// Assume that if the reader is open; it is at the correct offset a good performance assumption that we must ensure holds
if offset == r.offset && r.reader != nil {
readed, err := r.reader.ReadAt(p, offset)
r.offset += int64(readed)
return readed, err
}
// If any writers have written anything; commit it first so we can read it back.
if err := r.maybeCloseIo(); err != nil {
return 0, err
}
obj, err := r.storage.client.GetObject(r.ctx, r.storage.bucket, r.name, minio.GetObjectOptions{})
if err != nil {
return 0, err
}
r.reader = obj
r.offset = offset
readed, err := obj.ReadAt(p, offset)
r.offset += int64(readed)
return readed, err
}
// WriteAt
func (r *minioFileResource) WriteAt(b []byte, offset int64) (int, error) {
// If the writer is opened and at the correct offset we're good!
if offset == r.offset && r.writer != nil {
written, err := r.writer.Write(b)
r.offset += int64(written)
return written, err
}
// Ensure readers must be re-opened and that if a writer is active at another offset it is first committed before we do a "seek" below
if err := r.maybeCloseIo(); err != nil {
return 0, err
}
// WriteAt to a non existing file
if offset > r.currentIoSize {
return 0, ErrOutOfRange
}
r.offset = offset
buffer := bytes.NewReader(b)
opts := minio.PutObjectOptions{
ContentType: http.DetectContentType(b),
}
if offset > 0 {
opts.PartSize = uint64(offset)
opts.NumThreads = 8
opts.ConcurrentStreamParts = false
opts.DisableMultipart = true
}
if _, err := r.storage.client.PutObject(r.ctx, r.storage.bucket, r.name, buffer, buffer.Size(), opts); err != nil {
return 0, err
}
r.offset += int64(buffer.Len())
return buffer.Len(), nil
}
+96
View File
@@ -0,0 +1,96 @@
package miniostorage
import (
"io/fs"
"os"
"path/filepath"
"strings"
"time"
"github.com/minio/minio-go/v7"
)
const folderSize = 42
var _ fs.FileInfo = (*MinioFileInfo)(nil)
type MinioFileInfo struct {
ETag string
name string
size int64
updated time.Time
isDir bool
fileMode os.FileMode
}
// NewFileInfoFromAttrs
func NewFileInfoFromAttrs(obj minio.ObjectInfo, fileMode os.FileMode) *MinioFileInfo {
res := &MinioFileInfo{
ETag: obj.ETag,
name: obj.Key,
size: obj.Size,
updated: obj.LastModified,
isDir: false,
fileMode: fileMode,
}
if res.name == "" {
// deals with them at the moment
//res.name = "folder"
res.size = folderSize
res.isDir = true
}
return res
}
// Name
func (fi *MinioFileInfo) Name() string {
return filepath.Base(filepath.FromSlash(fi.name))
}
// Size
func (fi *MinioFileInfo) Size() int64 {
return fi.size
}
// Mode
func (fi *MinioFileInfo) Mode() os.FileMode {
if fi.IsDir() {
return os.ModeDir | fi.fileMode
}
return fi.fileMode
}
// ModTime
func (fi *MinioFileInfo) ModTime() time.Time {
return fi.updated
}
// IsDir
func (fi *MinioFileInfo) IsDir() bool {
return fi.isDir
}
// Sys
func (fi *MinioFileInfo) Sys() any {
return nil
}
type ByName []*MinioFileInfo
// Len
func (a ByName) Len() int { return len(a) }
// Swap
func (a ByName) Swap(i, j int) {
a[i].name, a[j].name = a[j].name, a[i].name
a[i].size, a[j].size = a[j].size, a[i].size
a[i].updated, a[j].updated = a[j].updated, a[i].updated
a[i].isDir, a[j].isDir = a[j].isDir, a[i].isDir
}
// Less
func (a ByName) Less(i, j int) bool {
return strings.Compare(a[i].Name(), a[j].Name()) == -1
}
+15
View File
@@ -0,0 +1,15 @@
package miniostorage
import "net/url"
// getUserPassword
func getUserPassword(u *url.URL) (string, string) {
var user, password string
if u.User != nil {
user = u.User.Username()
if p, ok := u.User.Password(); ok {
password = p
}
}
return user, password
}
+148
View File
@@ -0,0 +1,148 @@
package miniostorage
import (
"context"
"errors"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/tenrok/filestore/remote"
)
const defaultFileMode = 0o755
var _ remote.Storage = (*MinioStorage)(nil)
func init() {
remote.Register("minio", &MinioStorage{})
}
type MinioStorage struct {
ctx context.Context
client *minio.Client
bucket string
separator string
}
// NewStorage
func (s *MinioStorage) NewStorage(ctx context.Context, connString string) (remote.Storage, error) {
u, err := url.Parse(connString)
if err != nil {
return nil, err
}
queries := u.Query()
username, password := getUserPassword(u)
token := queries.Get("token")
opts := &minio.Options{
Creds: credentials.NewStaticV4(username, password, token),
Region: "us-east-1",
}
if queries.Has("secure") {
secure, err := strconv.ParseBool(queries.Get("secure"))
if err != nil {
return nil, err
}
opts.Secure = secure
}
if queries.Has("region") {
opts.Region = queries.Get("region")
}
client, err := minio.New(u.Host, opts)
if err != nil {
return nil, err
}
s.ctx = ctx
s.client = client
s.bucket = u.Path[1:]
s.separator = "/"
return s, nil
}
// normSeparators will normalize all "\\" and "/" to the provided separator
func (s *MinioStorage) normSeparators(str string) string {
return strings.Replace(strings.Replace(str, "\\", s.separator, -1), "/", s.separator, -1)
}
// Create
func (s *MinioStorage) Create(name string) (http.File, error) {
return s.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0)
}
// Open
func (s *MinioStorage) Open(name string) (http.File, error) {
return s.OpenFile(name, os.O_RDONLY, 0)
}
// OpenFile
func (s *MinioStorage) OpenFile(name string, flag int, fileMode os.FileMode) (http.File, error) {
if flag&os.O_APPEND != 0 {
return nil, errors.New("appending files will lead to trouble")
}
name = strings.TrimPrefix(s.normSeparators(name), s.separator)
file := NewMinioFile(s.ctx, s, flag, fileMode, name)
var err error
if flag&os.O_CREATE != 0 {
_, err = file.WriteString("")
}
return file, err
}
// Remove
func (s *MinioStorage) Remove(name string) error {
name = strings.TrimPrefix(s.normSeparators(name), s.separator)
return s.client.RemoveObject(s.ctx, s.bucket, name, minio.RemoveObjectOptions{GovernanceBypass: true})
}
// RemoveAll
func (s *MinioStorage) RemoveAll(path string) error {
path = strings.TrimPrefix(s.normSeparators(path), s.separator)
objectsCh := make(chan minio.ObjectInfo)
go func() {
defer close(objectsCh)
opts := minio.ListObjectsOptions{Prefix: path, Recursive: true}
for object := range s.client.ListObjects(s.ctx, s.bucket, opts) {
if object.Err != nil {
panic(object.Err)
}
objectsCh <- object
}
}()
errorCh := s.client.RemoveObjects(s.ctx, s.bucket, objectsCh, minio.RemoveObjectsOptions{})
for e := range errorCh {
return errors.New("Failed to remove " + e.ObjectName + ", error: " + e.Err.Error())
}
return nil
}
// Rename
func (s *MinioStorage) Rename(oldName, newName string) error {
if oldName == newName {
return nil
}
oldName = strings.TrimPrefix(s.normSeparators(oldName), s.separator)
newName = strings.TrimPrefix(s.normSeparators(newName), s.separator)
src := minio.CopySrcOptions{
Bucket: s.bucket,
Object: oldName,
}
dst := minio.CopyDestOptions{
Bucket: s.bucket,
Object: newName,
}
if _, err := s.client.CopyObject(s.ctx, dst, src); err != nil {
return err
}
return s.Remove(oldName)
}
// Stat
func (s *MinioStorage) Stat(name string) (os.FileInfo, error) {
name = strings.TrimPrefix(s.normSeparators(name), s.separator)
file := NewMinioFile(s.ctx, s, os.O_RDWR, defaultFileMode, name)
return file.Stat()
}
+52
View File
@@ -0,0 +1,52 @@
package remote
import (
"context"
"fmt"
"net/http"
"sync"
)
var (
storagesMu sync.RWMutex
storages = make(map[string]Storage)
)
type Storage interface {
NewStorage(ctx context.Context, connString string) (Storage, error)
// Create(name string) (http.File, error)
Open(name string) (http.File, error)
// OpenFile(name string, flag int, fileMode os.FileMode) (http.File, error)
// Remove(name string) error
// RemoveAll(path string) error
// Rename(oldName, newName string) error
// Stat(name string) (os.FileInfo, error)
}
// NewStorage returns a new remote storage instance.
func NewStorage(ctx context.Context, connString string) (Storage, error) {
scheme, err := schemeFromURL(connString)
if err != nil {
return nil, err
}
storagesMu.RLock()
s, ok := storages[scheme]
storagesMu.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown storage %v (forgotten import?)", scheme)
}
return s.NewStorage(ctx, connString)
}
// Register globally registers a storage
func Register(name string, storage Storage) {
storagesMu.Lock()
defer storagesMu.Unlock()
if storage == nil {
panic("Register storage is nil")
}
if _, exists := storages[name]; exists {
panic("Register called twice for storage " + name)
}
storages[name] = storage
}