mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2025-01-18 14:41:31 +08:00
add streaming v4
This commit is contained in:
parent
b90ad6f452
commit
f3ce3166ad
@ -48,6 +48,8 @@ func (iam *IdentityAccessManagement) reqSignatureV4Verify(r *http.Request) (*Ide
|
||||
const (
|
||||
emptySHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
streamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
|
||||
signV4ChunkedAlgorithm = "AWS4-HMAC-SHA256-PAYLOAD"
|
||||
streamingContentEncoding = "aws-chunked"
|
||||
|
||||
// http Header "x-amz-content-sha256" == "UNSIGNED-PAYLOAD" indicates that the
|
||||
// client did not calculate sha256 of the payload.
|
||||
|
@ -21,12 +21,115 @@ package s3api
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"github.com/dustin/go-humanize"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
// getChunkSignature - get chunk signature.
|
||||
func getChunkSignature(secretKey string, seedSignature string, region string, date time.Time, hashedChunk string) string {
|
||||
|
||||
// Calculate string to sign.
|
||||
stringToSign := signV4ChunkedAlgorithm + "\n" +
|
||||
date.Format(iso8601Format) + "\n" +
|
||||
getScope(date, region) + "\n" +
|
||||
seedSignature + "\n" +
|
||||
emptySHA256 + "\n" +
|
||||
hashedChunk
|
||||
|
||||
// Get hmac signing key.
|
||||
signingKey := getSigningKey(secretKey, date, region)
|
||||
|
||||
// Calculate signature.
|
||||
newSignature := getSignature(signingKey, stringToSign)
|
||||
|
||||
return newSignature
|
||||
}
|
||||
|
||||
// calculateSeedSignature - Calculate seed signature in accordance with
|
||||
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
// returns signature, error otherwise if the signature mismatches or any other
|
||||
// error while parsing and validating.
|
||||
func (iam *IdentityAccessManagement) calculateSeedSignature(r *http.Request) (cred *Credential, signature string, region string, date time.Time, errCode ErrorCode) {
|
||||
|
||||
// Copy request.
|
||||
req := *r
|
||||
|
||||
// Save authorization header.
|
||||
v4Auth := req.Header.Get("Authorization")
|
||||
|
||||
// Parse signature version '4' header.
|
||||
signV4Values, errCode := parseSignV4(v4Auth)
|
||||
if errCode != ErrNone {
|
||||
return nil, "", "", time.Time{}, errCode
|
||||
}
|
||||
|
||||
// Payload streaming.
|
||||
payload := streamingContentSHA256
|
||||
|
||||
// Payload for STREAMING signature should be 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD'
|
||||
if payload != req.Header.Get("X-Amz-Content-Sha256") {
|
||||
return nil, "", "", time.Time{}, ErrContentSHA256Mismatch
|
||||
}
|
||||
|
||||
// Extract all the signed headers along with its values.
|
||||
extractedSignedHeaders, errCode := extractSignedHeaders(signV4Values.SignedHeaders, r)
|
||||
if errCode != ErrNone {
|
||||
return nil, "", "", time.Time{}, errCode
|
||||
}
|
||||
// Verify if the access key id matches.
|
||||
_, cred, found := iam.lookupByAccessKey(signV4Values.Credential.accessKey)
|
||||
if !found {
|
||||
return nil, "", "", time.Time{}, ErrInvalidAccessKeyID
|
||||
}
|
||||
|
||||
// Verify if region is valid.
|
||||
region = signV4Values.Credential.scope.region
|
||||
|
||||
// Extract date, if not present throw error.
|
||||
var dateStr string
|
||||
if dateStr = req.Header.Get(http.CanonicalHeaderKey("x-amz-date")); dateStr == "" {
|
||||
if dateStr = r.Header.Get("Date"); dateStr == "" {
|
||||
return nil, "", "", time.Time{}, ErrMissingDateHeader
|
||||
}
|
||||
}
|
||||
// Parse date header.
|
||||
var err error
|
||||
date, err = time.Parse(iso8601Format, dateStr)
|
||||
if err != nil {
|
||||
return nil, "", "", time.Time{}, ErrMalformedDate
|
||||
}
|
||||
|
||||
// Query string.
|
||||
queryStr := req.URL.Query().Encode()
|
||||
|
||||
// Get canonical request.
|
||||
canonicalRequest := getCanonicalRequest(extractedSignedHeaders, payload, queryStr, req.URL.Path, req.Method)
|
||||
|
||||
// Get string to sign from canonical request.
|
||||
stringToSign := getStringToSign(canonicalRequest, date, signV4Values.Credential.getScope())
|
||||
|
||||
// Get hmac signing key.
|
||||
signingKey := getSigningKey(cred.SecretKey, signV4Values.Credential.scope.date, region)
|
||||
|
||||
// Calculate signature.
|
||||
newSignature := getSignature(signingKey, stringToSign)
|
||||
|
||||
// Verify if signature match.
|
||||
if !compareSignatureV4(newSignature, signV4Values.Signature) {
|
||||
return nil, "", "", time.Time{}, ErrSignatureDoesNotMatch
|
||||
}
|
||||
|
||||
// Return caculated signature.
|
||||
return cred, newSignature, region, date, ErrNone
|
||||
}
|
||||
|
||||
const maxLineLength = 4 * humanize.KiByte // assumed <= bufio.defaultBufSize 4KiB
|
||||
|
||||
// lineTooLong is generated as chunk header is bigger than 4KiB.
|
||||
@ -38,22 +141,36 @@ var errMalformedEncoding = errors.New("malformed chunked encoding")
|
||||
// newSignV4ChunkedReader returns a new s3ChunkedReader that translates the data read from r
|
||||
// out of HTTP "chunked" format before returning it.
|
||||
// The s3ChunkedReader returns io.EOF when the final 0-length chunk is read.
|
||||
func newSignV4ChunkedReader(req *http.Request) io.ReadCloser {
|
||||
return &s3ChunkedReader{
|
||||
reader: bufio.NewReader(req.Body),
|
||||
state: readChunkHeader,
|
||||
func (iam *IdentityAccessManagement) newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, ErrorCode) {
|
||||
ident, seedSignature, region, seedDate, errCode := iam.calculateSeedSignature(req)
|
||||
if errCode != ErrNone {
|
||||
return nil, errCode
|
||||
}
|
||||
return &s3ChunkedReader{
|
||||
cred: ident,
|
||||
reader: bufio.NewReader(req.Body),
|
||||
seedSignature: seedSignature,
|
||||
seedDate: seedDate,
|
||||
region: region,
|
||||
chunkSHA256Writer: sha256.New(),
|
||||
state: readChunkHeader,
|
||||
}, ErrNone
|
||||
}
|
||||
|
||||
// Represents the overall state that is required for decoding a
|
||||
// AWS Signature V4 chunked reader.
|
||||
type s3ChunkedReader struct {
|
||||
reader *bufio.Reader
|
||||
state chunkState
|
||||
lastChunk bool
|
||||
chunkSignature string
|
||||
n uint64 // Unread bytes in chunk
|
||||
err error
|
||||
cred *Credential
|
||||
reader *bufio.Reader
|
||||
seedSignature string
|
||||
seedDate time.Time
|
||||
region string
|
||||
state chunkState
|
||||
lastChunk bool
|
||||
chunkSignature string
|
||||
chunkSHA256Writer hash.Hash // Calculates sha256 of chunk data.
|
||||
n uint64 // Unread bytes in chunk
|
||||
err error
|
||||
}
|
||||
|
||||
// Read chunk reads the chunk token signature portion.
|
||||
@ -152,6 +269,9 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
|
||||
return 0, cr.err
|
||||
}
|
||||
|
||||
// Calculate sha256.
|
||||
cr.chunkSHA256Writer.Write(rbuf[:n0])
|
||||
|
||||
// Update the bytes read into request buffer so far.
|
||||
n += n0
|
||||
buf = buf[n0:]
|
||||
@ -164,6 +284,19 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
|
||||
continue
|
||||
}
|
||||
case verifyChunk:
|
||||
// Calculate the hashed chunk.
|
||||
hashedChunk := hex.EncodeToString(cr.chunkSHA256Writer.Sum(nil))
|
||||
// Calculate the chunk signature.
|
||||
newSignature := getChunkSignature(cr.cred.SecretKey, cr.seedSignature, cr.region, cr.seedDate, hashedChunk)
|
||||
if !compareSignatureV4(cr.chunkSignature, newSignature) {
|
||||
// Chunk signature doesn't match we return signature does not match.
|
||||
cr.err = errors.New("chunk signature does not match")
|
||||
return 0, cr.err
|
||||
}
|
||||
// Newly calculated signature becomes the seed for the next chunk
|
||||
// this follows the chaining.
|
||||
cr.seedSignature = newSignature
|
||||
cr.chunkSHA256Writer.Reset()
|
||||
if cr.lastChunk {
|
||||
cr.state = eofChunk
|
||||
} else {
|
||||
|
@ -41,8 +41,13 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
rAuthType := getRequestAuthType(r)
|
||||
dataReader := r.Body
|
||||
var s3ErrCode ErrorCode
|
||||
if rAuthType == authTypeStreamingSigned {
|
||||
dataReader = newSignV4ChunkedReader(r)
|
||||
dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
|
||||
}
|
||||
if s3ErrCode != ErrNone {
|
||||
writeErrorResponse(w, s3ErrCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
|
||||
|
@ -3,13 +3,14 @@ package s3api
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/gorilla/mux"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -195,9 +196,14 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
var s3ErrCode ErrorCode
|
||||
dataReader := r.Body
|
||||
if rAuthType == authTypeStreamingSigned {
|
||||
dataReader = newSignV4ChunkedReader(r)
|
||||
dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
|
||||
}
|
||||
if s3ErrCode != ErrNone {
|
||||
writeErrorResponse(w, s3ErrCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
|
||||
|
Loading…
Reference in New Issue
Block a user