diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java index 81ce3d8cf..e9453d1bb 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java @@ -56,9 +56,12 @@ public class SeaweedRead { HttpClient client = HttpClientBuilder.create().build(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + + if (!chunkView.isFullChunk){ + request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.RANGE, + String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + } try { HttpResponse response = client.execute(request); @@ -86,11 +89,13 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( chunk.fileId, offset - chunk.start, Math.min(chunk.stop, stop) - offset, - offset + offset, + isFullChunk )); offset = Math.min(chunk.stop, stop); } @@ -123,7 +128,8 @@ public class SeaweedRead { chunk.getOffset(), chunk.getOffset() + chunk.getSize(), chunk.getFileId(), - chunk.getMtime() + chunk.getMtime(), + true ); // easy cases to speed up @@ -142,7 +148,8 @@ public class SeaweedRead { v.start, chunk.getOffset(), v.fileId, - v.modifiedTime + v.modifiedTime, + false )); } long chunkStop = chunk.getOffset() + chunk.getSize(); @@ -151,7 +158,8 @@ public class SeaweedRead { chunkStop, v.stop, v.fileId, - v.modifiedTime + v.modifiedTime, + false )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -197,21 +205,24 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final boolean isFullChunk; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.isFullChunk = isFullChunk; } @Override public String toString() { - return "VisibleIntervalq{" + + return "VisibleInterval{" + "start=" + start + ", stop=" + stop + ", modifiedTime=" + modifiedTime + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + '}'; } } @@ -221,12 +232,14 @@ public class SeaweedRead { public final long offset; public final long size; public final long logicOffset; + public final boolean isFullChunk; - public ChunkView(String fileId, long offset, long size, long logicOffset) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; + this.isFullChunk = isFullChunk; } @Override @@ -236,6 +249,7 @@ public class SeaweedRead { ", offset=" + offset + ", size=" + size + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + '}'; } } diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 8f9746324..39e43cf3c 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -70,6 +70,7 @@ type ChunkView struct { Offset int64 Size uint64 LogicOffset int64 + IsFullChunk bool } func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views []*ChunkView) { @@ -80,11 +81,13 @@ func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int) (views for _, chunk := range visibles { if chunk.start <= offset && offset < chunk.stop && offset < stop { + isFullChunk := chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop views = append(views, &ChunkView{ FileId: chunk.fileId, Offset: offset - chunk.start, // offset is the data starting location in this file id Size: uint64(min(chunk.stop, stop) - offset), LogicOffset: offset, + IsFullChunk: isFullChunk, }) offset = min(chunk.stop, stop) } @@ -116,6 +119,7 @@ func mergeIntoVisibles(visibles, newVisibles []*visibleInterval, chunk *filer_pb chunk.Offset+int64(chunk.Size), chunk.FileId, chunk.Mtime, + true, ) length := len(visibles) @@ -135,6 +139,7 @@ func mergeIntoVisibles(visibles, newVisibles []*visibleInterval, chunk *filer_pb chunk.Offset, v.fileId, v.modifiedTime, + false, )) } chunkStop := chunk.Offset + int64(chunk.Size) @@ -144,6 +149,7 @@ func mergeIntoVisibles(visibles, newVisibles []*visibleInterval, chunk *filer_pb v.stop, v.fileId, v.modifiedTime, + false, )) } if chunkStop <= v.start || v.stop <= chunk.Offset { @@ -195,14 +201,16 @@ type visibleInterval struct { stop int64 modifiedTime int64 fileId string + isFullChunk bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64) *visibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, isFullChunk bool) *visibleInterval { return &visibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, + isFullChunk: isFullChunk, } } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index b4c2920bb..e74228f3c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -117,7 +117,8 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), chunkView.Offset, int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)]) + buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)], + !chunkView.IsFullChunk) if err != nil { diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 8ff722e67..5c4be7aee 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -161,6 +161,6 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, e return nil, err } buf := make([]byte, chunk.Size) - util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf) + util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf, true) return bytes.NewReader(buf), nil } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 7ae5713bb..77a7a5fa3 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "compress/gzip" "encoding/json" "errors" "fmt" @@ -184,24 +185,38 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte) (n int64, e error) { +func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) { req, _ := http.NewRequest("GET", fileUrl, nil) - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + if isReadRange { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + } else { + req.Header.Set("Accept-Encoding", "gzip") + } r, err := client.Do(req) if err != nil { return 0, err } + defer r.Body.Close() if r.StatusCode >= 400 { return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) } + var reader io.ReadCloser + switch r.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + var i, m int for { - m, err = r.Body.Read(buf[i:]) + m, err = reader.Read(buf[i:]) if m == 0 { return }