Merge branch 'master' into mq-subscribe

This commit is contained in:
chrislu 2024-03-21 08:22:14 -07:00
commit e641d49f9f
3 changed files with 14 additions and 9 deletions

View File

@ -394,7 +394,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
glog.V(0).Infof("received %v", resp) glog.V(0).Infof("received %v", resp)
} }
if isMultipartUploadDir(resp.Directory) { if isMultipartUploadDir(resp.Directory + "/") {
return nil return nil
} }

View File

@ -175,18 +175,23 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
uploader.PartSize = 0 uploader.PartSize = 0
} }
} }
if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; !ok {
doSaveMtime := true
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
} else if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; ok {
doSaveMtime = false
}
if doSaveMtime {
entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10)) entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10))
} }
// process tagging // process tagging
tags := "" tags := ""
if true { for k, v := range entry.Extended {
for k, v := range entry.Extended { if len(tags) > 0 {
if len(tags) > 0 { tags = tags + "&"
tags = tags + "&"
}
tags = tags + k + "=" + string(v)
} }
tags = tags + k + "=" + string(v)
} }
// Upload the file to S3. // Upload the file to S3.

View File

@ -57,7 +57,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if bytesBuf != nil { if bytesBuf != nil {
readSize = bytesBuf.Len() readSize = bytesBuf.Len()
} }
glog.V(1).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
if bytesBuf == nil { if bytesBuf == nil {
if batchIndex >= 0 { if batchIndex >= 0 {
lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)