Merge pull request #2 from chrislusf/master

merge
This commit is contained in:
yourchanges 2020-07-17 19:49:16 +08:00 committed by GitHub
commit 64df5207db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 1548 additions and 2415 deletions

View File

@ -123,7 +123,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a
* [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=604800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/)
[Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files
[Mount]: https://github.com/chrislusf/seaweedfs/wiki/Mount
[Mount]: https://github.com/chrislusf/seaweedfs/wiki/FUSE-Mount
[AmazonS3API]: https://github.com/chrislusf/seaweedfs/wiki/Amazon-S3-API
[BackupToCloud]: https://github.com/chrislusf/seaweedfs/wiki/Backup-to-Cloud
[Hadoop]: https://github.com/chrislusf/seaweedfs/wiki/Hadoop-Compatible-File-System

View File

@ -1,4 +1,4 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
version: 1.84
version: 1.85

View File

@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
imageTag: "1.84"
imageTag: "1.85"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always

View File

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.2.9</version>
<version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View File

@ -0,0 +1,170 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>9</version>
</parent>
<properties>
<protobuf.version>3.9.1</protobuf.version>
<!-- follow https://github.com/grpc/grpc-java -->
<grpc.version>1.23.0</grpc.version>
<guava.version>28.0-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>com.moandjiezana.toml</groupId>
<artifactId>toml4j</artifactId>
<version>0.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.2.9</version>
<version>1.3.6</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View File

@ -0,0 +1,22 @@
package seaweedfs.client;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ByteBufferPool {
static List<ByteBuffer> bufferList = new ArrayList<>();
public static synchronized ByteBuffer request(int bufferSize) {
if (bufferList.isEmpty()) {
return ByteBuffer.allocate(bufferSize);
}
return bufferList.remove(bufferList.size()-1);
}
public static synchronized void release(ByteBuffer obj) {
bufferList.add(obj);
}
}

View File

@ -7,9 +7,12 @@ import java.util.concurrent.TimeUnit;
public class ChunkCache {
private final Cache<String, byte[]> cache;
private Cache<String, byte[]> cache = null;
public ChunkCache(int maxEntries) {
if (maxEntries == 0) {
return;
}
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
@ -17,10 +20,16 @@ public class ChunkCache {
}
public byte[] getChunk(String fileId) {
if (this.cache == null) {
return null;
}
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
if (this.cache == null) {
return;
}
this.cache.put(fileId, data);
}

View File

@ -156,7 +156,7 @@ public class FilerClient {
List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>();
String lastFileName = "";
for (int limit = Integer.MAX_VALUE; limit > 0; ) {
List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024);
List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false);
if (t == null) {
break;
}
@ -173,11 +173,12 @@ public class FilerClient {
return results;
}
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) {
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
.setInclusiveStartFrom(includeLastEntry)
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();

View File

@ -39,8 +39,10 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) {
this(sslContext == null ?
ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() :
ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext()
.maxInboundMessageSize(1024 * 1024 * 1024) :
NettyChannelBuilder.forAddress(host, grpcPort)
.maxInboundMessageSize(1024 * 1024 * 1024)
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));

View File

@ -4,6 +4,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
@ -18,7 +19,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
static ChunkCache chunkCache = new ChunkCache(1000);
static ChunkCache chunkCache = new ChunkCache(16);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@ -78,7 +79,6 @@ public class SeaweedRead {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@ -86,20 +86,21 @@ public class SeaweedRead {
byte[] data = null;
CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request);
try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
data = EntityUtils.toByteArray(entity);
EntityUtils.consume(entity);
} finally {
if (client instanceof Closeable) {
Closeable t = (Closeable) client;
t.close();
}
response.close();
request.releaseConnection();
}
if (chunkView.isGzipped) {
if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
@ -129,7 +130,7 @@ public class SeaweedRead {
offset,
isFullChunk,
chunk.cipherKey,
chunk.isGzipped
chunk.isCompressed
));
offset = Math.min(chunk.stop, stop);
}
@ -165,7 +166,7 @@ public class SeaweedRead {
chunk.getMtime(),
true,
chunk.getCipherKey().toByteArray(),
chunk.getIsGzipped()
chunk.getIsCompressed()
);
// easy cases to speed up
@ -187,7 +188,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
v.isGzipped
v.isCompressed
));
}
long chunkStop = chunk.getOffset() + chunk.getSize();
@ -199,7 +200,7 @@ public class SeaweedRead {
v.modifiedTime,
false,
v.cipherKey,
v.isGzipped
v.isCompressed
));
}
if (chunkStop <= v.start || v.stop <= chunk.getOffset()) {
@ -247,16 +248,16 @@ public class SeaweedRead {
public final String fileId;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isGzipped;
public final boolean isCompressed;
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
this.fileId = fileId;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isGzipped = isGzipped;
this.isCompressed = isCompressed;
}
@Override
@ -268,7 +269,7 @@ public class SeaweedRead {
", fileId='" + fileId + '\'' +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
", isGzipped=" + isGzipped +
", isCompressed=" + isCompressed +
'}';
}
}
@ -280,16 +281,16 @@ public class SeaweedRead {
public final long logicOffset;
public final boolean isFullChunk;
public final byte[] cipherKey;
public final boolean isGzipped;
public final boolean isCompressed;
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) {
public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
this.logicOffset = logicOffset;
this.isFullChunk = isFullChunk;
this.cipherKey = cipherKey;
this.isGzipped = isGzipped;
this.isCompressed = isCompressed;
}
@Override
@ -301,7 +302,7 @@ public class SeaweedRead {
", logicOffset=" + logicOffset +
", isFullChunk=" + isFullChunk +
", cipherKey=" + Arrays.toString(cipherKey) +
", isGzipped=" + isGzipped +
", isCompressed=" + isCompressed +
'}';
}
}

View File

@ -0,0 +1,27 @@
package seaweedfs.client;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class SeaweedUtil {
static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
static {
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
}
public static CloseableHttpClient getClosableHttpClient() {
return HttpClientBuilder.create()
.setConnectionManager(cm)
.setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE)
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
}
}

View File

@ -3,10 +3,11 @@ package seaweedfs.client;
import com.google.protobuf.ByteString;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
@ -16,7 +17,7 @@ import java.security.SecureRandom;
public class SeaweedWrite {
private static SecureRandom random = new SecureRandom();
private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
@ -79,8 +80,6 @@ public class SeaweedWrite {
final long bytesOffset, final long bytesLength,
byte[] cipherKey) throws IOException {
HttpClient client = new DefaultHttpClient();
InputStream inputStream = null;
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
@ -103,8 +102,9 @@ public class SeaweedWrite {
.addBinaryBody("upload", inputStream)
.build());
CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post);
try {
HttpResponse response = client.execute(post);
String etag = response.getLastHeader("ETag").getValue();
@ -112,12 +112,12 @@ public class SeaweedWrite {
etag = etag.substring(1, etag.length() - 1);
}
EntityUtils.consume(response.getEntity());
return etag;
} finally {
if (client instanceof Closeable) {
Closeable t = (Closeable) client;
t.close();
}
response.close();
post.releaseConnection();
}
}

View File

@ -115,6 +115,11 @@ message FileChunk {
FileId source_fid = 8;
bytes cipher_key = 9;
bool is_compressed = 10;
bool is_chunk_manifest = 11; // content is a list of FileChunks
}
message FileChunkManifest {
repeated FileChunk chunks = 1;
}
message FileId {

View File

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version>
<seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

View File

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version>
<seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

View File

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBuffer {
private SeaweedInputStream stream;
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
private byte[] buffer; // the buffer itself
private int bufferindex = -1; // index in the buffers array in Buffer manager
private ReadBufferStatus status; // status of the buffer
private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked
// fields to help with eviction logic
private long timeStamp = 0; // tick at which buffer became available to read
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
public SeaweedInputStream getStream() {
return stream;
}
public void setStream(SeaweedInputStream stream) {
this.stream = stream;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getRequestedLength() {
return requestedLength;
}
public void setRequestedLength(int requestedLength) {
this.requestedLength = requestedLength;
}
public byte[] getBuffer() {
return buffer;
}
public void setBuffer(byte[] buffer) {
this.buffer = buffer;
}
public int getBufferindex() {
return bufferindex;
}
public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public boolean isFirstByteConsumed() {
return isFirstByteConsumed;
}
public void setFirstByteConsumed(boolean isFirstByteConsumed) {
this.isFirstByteConsumed = isFirstByteConsumed;
}
public boolean isLastByteConsumed() {
return isLastByteConsumed;
}
public void setLastByteConsumed(boolean isLastByteConsumed) {
this.isLastByteConsumed = isLastByteConsumed;
}
public boolean isAnyByteConsumed() {
return isAnyByteConsumed;
}
public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
}

View File

@ -1,394 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = new Thread(new ReadBufferWorker(i));
t.setDaemon(true);
threads[i] = t;
t.setName("SeaweedFS-prefetch-" + i);
t.start();
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
// hide instance constructor
private ReadBufferManager() {
}
/*
*
* SeaweedInputStream-facing methods
*
*/
/**
* {@link SeaweedInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link SeaweedInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
* @param requestedLength The length to read
*/
void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
buffer = new ReadBuffer();
buffer.setStream(stream);
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
/**
* {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
* read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done read from Cache for {} position {} length {}",
stream.getPath(), position, bytesRead);
}
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/*
*
* Internal methods
*
*/
private void waitForProcess(final SeaweedInputStream stream, final long position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
readBuf = getFromList(inProgressList, stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("latch done for file {} buffer idx {} length {}",
stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
/**
* If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
*
* @return whether the eviction succeeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (completedReadList.size() <= 0) {
return false; // there are no evict-able buffers
}
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
for (ReadBuffer buf : completedReadList) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try any old nodes that have not been consumed
long earliestBirthday = Long.MAX_VALUE;
for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
// nothing can be evicted
return false;
}
private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(readAheadQueue, stream, requestedOffset)
|| isInList(inProgressList, stream, requestedOffset)
|| isInList(completedReadList, stream, requestedOffset));
}
private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
readAheadQueue.remove(buffer);
notifyAll(); // lock is held in calling method
freeList.push(buffer.getBufferindex());
}
}
private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
return lengthToCopy;
}
/*
*
* ReadBufferWorker-thread-facing methods
*
*/
/**
* ReadBufferWorker thread calls this to get the next buffer that it should work on.
*
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
//buffer = readAheadQueue.take(); // blocking method
while (readAheadQueue.size() == 0) {
wait();
}
buffer = readAheadQueue.remove();
notifyAll();
if (buffer == null) {
return null; // should never happen
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
* ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
}

View File

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
ReadBufferWorker(final int id) {
this.id = id;
}
/**
* return the ID of ReadBufferWorker.
*/
public int getId() {
return this.id;
}
/**
* Waits until a buffer becomes available in ReadAheadQueue.
* Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
* Rinse and repeat. Forever.
*/
public void run() {
try {
UNLEASH_WORKERS.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
}
}
}

View File

@ -18,12 +18,18 @@
package seaweed.hdfs;
/**
* The ReadBufferStatus for Rest AbfsClient
*/
public enum ReadBufferStatus {
NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
AVAILABLE, // data is available in buffer. It should be in completedList
READ_FAILED // read completed, but failed.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
throws IOException, URISyntaxException {
super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
}
}

View File

@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -22,7 +23,7 @@ import java.util.Map;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
@ -144,7 +145,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
public boolean rename(Path src, Path dst) {
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@ -155,12 +156,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
if (src.equals(dst)) {
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
String sourceFileName = src.getName();
Path adjustedDst = dst;
if (dstFileStatus != null) {
if (entry != null) {
FileStatus dstFileStatus = getFileStatus(dst);
String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@ -175,18 +177,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
public boolean delete(Path path, boolean recursive) {
public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
if (fileStatus == null) {
if (entry == null) {
return true;
}
FileStatus fileStatus = getFileStatus(path);
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@ -222,9 +226,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
if (fileStatus == null) {
if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@ -233,6 +237,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
return true;
} else {
@ -241,7 +247,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
@Override
public FileStatus getFileStatus(Path path) {
public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);

View File

@ -61,7 +61,7 @@ public class SeaweedFileSystemStore {
);
}
public FileStatus[] listEntries(final Path path) {
public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
FileStatus pathStatus = getFileStatus(path);
@ -89,11 +89,11 @@ public class SeaweedFileSystemStore {
}
public FileStatus getFileStatus(final Path path) {
public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
return null;
throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@ -136,7 +136,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
private FilerProto.Entry lookupEntry(Path path) {
public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics,
path.toUri().getPath(),
entry,
bufferSize,
readAheadQueueDepth);
bufferSize);
}
public void setOwner(Path path, String owner, String group) {

View File

@ -27,16 +27,9 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use
private long position = 0; // cursor of the file
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false;
public SeaweedInputStream(
@ -44,16 +37,13 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
final int bufferSize,
final int readAheadQueueDepth) {
final int bufferSize) {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
if (offset >= b.length) {
if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
if (length < 0) {
if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (length > (b.length - offset)) {
if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int)bytesRead;
}
/**
@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
if (n >= fCursor - limit && n <= fCursor) { // within buffer
bCursor = (int) (n - (fCursor - limit));
return;
}
this.position = n;
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
}
@Override
@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
long currentPos = getPos();
if (currentPos == contentLength) {
if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
long newPos = currentPos + n;
long newPos = this.position + n;
if (newPos < 0) {
newPos = 0;
n = newPos - currentPos;
n = newPos - this.position;
}
if (newPos > contentLength) {
newPos = contentLength;
n = newPos - currentPos;
n = newPos - this.position;
}
seek(newPos);
return n;
@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return fCursor - limit + bCursor;
return position;
}
/**
@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**

View File

@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@ -14,6 +15,7 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@ -28,16 +30,16 @@ public class SeaweedOutputStream extends OutputStream {
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry;
private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private byte[] buffer;
private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -50,8 +52,8 @@ public class SeaweedOutputStream extends OutputStream {
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize];
this.bufferIndex = 0;
this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@ -93,25 +95,29 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
// System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
writeCurrentBufferToService();
currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else {
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
if (numberOfBytesToWrite < writableBytes) {
buffer.put(data, currentOffset, numberOfBytesToWrite);
outputIndex += numberOfBytesToWrite;
break;
}
writableBytes = bufferSize - bufferIndex;
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position();
}
}
/**
@ -147,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
ByteBufferPool.release(buffer);
buffer = null;
bufferIndex = 0;
outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@ -158,35 +165,17 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
if (buffer.position() == 0) {
return;
}
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
buffer = new byte[bufferSize];
bufferIndex = 0;
final long offset = position;
buffer.flip();
int bytesLength = buffer.limit() - buffer.position();
SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
// System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
buffer.clear();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {

View File

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version>
<seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

View File

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.2.9</seaweedfs.client.version>
<seaweedfs.client.version>1.3.6</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

View File

@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBuffer {
private SeaweedInputStream stream;
private long offset; // offset within the file for the buffer
private int length; // actual length, set after the buffer is filles
private int requestedLength; // requested length of the read
private byte[] buffer; // the buffer itself
private int bufferindex = -1; // index in the buffers array in Buffer manager
private ReadBufferStatus status; // status of the buffer
private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
// waiting on this buffer gets unblocked
// fields to help with eviction logic
private long timeStamp = 0; // tick at which buffer became available to read
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
public SeaweedInputStream getStream() {
return stream;
}
public void setStream(SeaweedInputStream stream) {
this.stream = stream;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public int getRequestedLength() {
return requestedLength;
}
public void setRequestedLength(int requestedLength) {
this.requestedLength = requestedLength;
}
public byte[] getBuffer() {
return buffer;
}
public void setBuffer(byte[] buffer) {
this.buffer = buffer;
}
public int getBufferindex() {
return bufferindex;
}
public void setBufferindex(int bufferindex) {
this.bufferindex = bufferindex;
}
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public boolean isFirstByteConsumed() {
return isFirstByteConsumed;
}
public void setFirstByteConsumed(boolean isFirstByteConsumed) {
this.isFirstByteConsumed = isFirstByteConsumed;
}
public boolean isLastByteConsumed() {
return isLastByteConsumed;
}
public void setLastByteConsumed(boolean isLastByteConsumed) {
this.isLastByteConsumed = isLastByteConsumed;
}
public boolean isAnyByteConsumed() {
return isAnyByteConsumed;
}
public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
}

View File

@ -1,394 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
final class ReadBufferManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
static {
BUFFER_MANAGER = new ReadBufferManager();
BUFFER_MANAGER.init();
}
static ReadBufferManager getBufferManager() {
return BUFFER_MANAGER;
}
private void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
freeList.add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
Thread t = new Thread(new ReadBufferWorker(i));
t.setDaemon(true);
threads[i] = t;
t.setName("SeaweedFS-prefetch-" + i);
t.start();
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
// hide instance constructor
private ReadBufferManager() {
}
/*
*
* SeaweedInputStream-facing methods
*
*/
/**
* {@link SeaweedInputStream} calls this method to queue read-aheads.
*
* @param stream The {@link SeaweedInputStream} for which to do the read-ahead
* @param requestedOffset The offset in the file which shoukd be read
* @param requestedLength The length to read
*/
void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
}
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
if (freeList.isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
buffer = new ReadBuffer();
buffer.setStream(stream);
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
readAheadQueue.add(buffer);
notifyAll();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
stream.getPath(), requestedOffset, buffer.getBufferindex());
}
}
/**
* {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
* read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
stream.getPath(), position, Thread.currentThread().getName());
}
waitForProcess(stream, position);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
}
if (bytesRead > 0) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done read from Cache for {} position {} length {}",
stream.getPath(), position, bytesRead);
}
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/*
*
* Internal methods
*
*/
private void waitForProcess(final SeaweedInputStream stream, final long position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
readBuf = getFromList(inProgressList, stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
}
readBuf.getLatch().await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("latch done for file {} buffer idx {} length {}",
stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
}
}
}
/**
* If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
*
* @return whether the eviction succeeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (completedReadList.size() <= 0) {
return false; // there are no evict-able buffers
}
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : completedReadList) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
for (ReadBuffer buf : completedReadList) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return evict(nodeToEvict);
}
// next, try any old nodes that have not been consumed
long earliestBirthday = Long.MAX_VALUE;
for (ReadBuffer buf : completedReadList) {
if (buf.getTimeStamp() < earliestBirthday) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
}
}
if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
// nothing can be evicted
return false;
}
private boolean evict(final ReadBuffer buf) {
freeList.push(buf.getBufferindex());
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
}
return true;
}
private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(readAheadQueue, stream, requestedOffset)
|| isInList(inProgressList, stream, requestedOffset)
|| isInList(completedReadList, stream, requestedOffset));
}
private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
readAheadQueue.remove(buffer);
notifyAll(); // lock is held in calling method
freeList.push(buffer.getBufferindex());
}
}
private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
final byte[] buffer) {
ReadBuffer buf = getFromList(completedReadList, stream, position);
if (buf == null || position >= buf.getOffset() + buf.getLength()) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
return lengthToCopy;
}
/*
*
* ReadBufferWorker-thread-facing methods
*
*/
/**
* ReadBufferWorker thread calls this to get the next buffer that it should work on.
*
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
//buffer = readAheadQueue.take(); // blocking method
while (readAheadQueue.size() == 0) {
wait();
}
buffer = readAheadQueue.remove();
notifyAll();
if (buffer == null) {
return null; // should never happen
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
inProgressList.add(buffer);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
buffer.getStream().getPath(), buffer.getOffset());
}
return buffer;
}
/**
* ReadBufferWorker thread calls this method to post completion.
*
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
}
synchronized (this) {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
// buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
}

View File

@ -1,70 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package seaweed.hdfs;
import java.util.concurrent.CountDownLatch;
class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
private int id;
ReadBufferWorker(final int id) {
this.id = id;
}
/**
* return the ID of ReadBufferWorker.
*/
public int getId() {
return this.id;
}
/**
* Waits until a buffer becomes available in ReadAheadQueue.
* Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
* Rinse and repeat. Forever.
*/
public void run() {
try {
UNLEASH_WORKERS.await();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
}
}
}

View File

@ -18,12 +18,18 @@
package seaweed.hdfs;
/**
* The ReadBufferStatus for Rest AbfsClient
*/
public enum ReadBufferStatus {
NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
AVAILABLE, // data is available in buffer. It should be in completedList
READ_FAILED // read completed, but failed.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class SeaweedAbstractFileSystem extends DelegateToFileSystem {
SeaweedAbstractFileSystem(final URI uri, final Configuration conf)
throws IOException, URISyntaxException {
super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false);
}
}

View File

@ -10,6 +10,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerProto;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -144,7 +145,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
public boolean rename(Path src, Path dst) {
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("rename path: {} => {}", src, dst);
@ -155,12 +156,13 @@ public class SeaweedFileSystem extends FileSystem {
if (src.equals(dst)) {
return true;
}
FileStatus dstFileStatus = getFileStatus(dst);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst);
String sourceFileName = src.getName();
Path adjustedDst = dst;
if (dstFileStatus != null) {
if (entry != null) {
FileStatus dstFileStatus = getFileStatus(dst);
String sourceFileName = src.getName();
if (!dstFileStatus.isDirectory()) {
return false;
}
@ -175,18 +177,20 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
public boolean delete(Path path, boolean recursive) {
public boolean delete(Path path, boolean recursive) throws IOException {
LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
if (fileStatus == null) {
if (entry == null) {
return true;
}
FileStatus fileStatus = getFileStatus(path);
return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
}
@ -222,9 +226,9 @@ public class SeaweedFileSystem extends FileSystem {
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path);
if (fileStatus == null) {
if (entry == null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
@ -233,6 +237,8 @@ public class SeaweedFileSystem extends FileSystem {
}
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
return true;
} else {
@ -241,7 +247,7 @@ public class SeaweedFileSystem extends FileSystem {
}
@Override
public FileStatus getFileStatus(Path path) {
public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("getFileStatus path: {}", path);

View File

@ -61,7 +61,7 @@ public class SeaweedFileSystemStore {
);
}
public FileStatus[] listEntries(final Path path) {
public FileStatus[] listEntries(final Path path) throws IOException {
LOG.debug("listEntries path: {}", path);
FileStatus pathStatus = getFileStatus(path);
@ -89,11 +89,11 @@ public class SeaweedFileSystemStore {
}
public FileStatus getFileStatus(final Path path) {
public FileStatus getFileStatus(final Path path) throws IOException {
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
return null;
throw new FileNotFoundException("File does not exist: " + path);
}
LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
@ -136,7 +136,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
private FilerProto.Entry lookupEntry(Path path) {
public FilerProto.Entry lookupEntry(Path path) {
return filerClient.lookupEntry(getParentDirectory(path), path.getName());
@ -212,7 +212,6 @@ public class SeaweedFileSystemStore {
LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
int readAheadQueueDepth = 2;
FilerProto.Entry entry = lookupEntry(path);
if (entry == null) {
@ -223,8 +222,7 @@ public class SeaweedFileSystemStore {
statistics,
path.toUri().getPath(),
entry,
bufferSize,
readAheadQueueDepth);
bufferSize);
}
public void setOwner(Path path, String owner, String group) {

View File

@ -27,16 +27,9 @@ public class SeaweedInputStream extends FSInputStream {
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int readAheadQueueDepth; // initialized in constructor
private final boolean readAheadEnabled; // whether enable readAhead;
private byte[] buffer = null; // will be initialized on first use
private long position = 0; // cursor of the file
private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
private long fCursorAfterLastRead = -1;
private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
// of valid bytes in buffer)
private boolean closed = false;
public SeaweedInputStream(
@ -44,16 +37,13 @@ public class SeaweedInputStream extends FSInputStream {
final Statistics statistics,
final String path,
final FilerProto.Entry entry,
final int bufferSize,
final int readAheadQueueDepth) {
final int bufferSize) {
this.filerGrpcClient = filerGrpcClient;
this.statistics = statistics;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
@ -78,122 +68,7 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
currentOff += lastReadBytes;
currentLen -= lastReadBytes;
totalReadBytes += lastReadBytes;
}
if (currentLen <= 0 || currentLen > b.length - currentOff) {
break;
}
} while (lastReadBytes > 0);
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Preconditions.checkNotNull(b);
if (len == 0) {
return 0;
}
if (this.available() == 0) {
return -1;
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
//If buffer is empty, then fill the buffer.
if (bCursor == limit) {
//If EOF, then return -1
if (fCursor >= contentLength) {
return -1;
}
long bytesRead = 0;
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) {
buffer = new byte[bufferSize];
}
// Enable readAhead when reading sequentially
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
}
if (bytesRead == -1) {
return -1;
}
limit += bytesRead;
fCursor += bytesRead;
fCursorAfterLastRead = fCursor;
}
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
bCursor += bytesToRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
return bytesToRead;
}
private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
}
int receivedBytes;
// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
}
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
return receivedBytes;
}
// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
return readRemote(position, b, offset, length);
}
}
int readRemote(long position, byte[] b, int offset, int length) throws IOException {
if (position < 0) {
throw new IllegalArgumentException("attempting to read from negative offset");
}
@ -203,21 +78,30 @@ public class SeaweedInputStream extends FSInputStream {
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
if (offset >= b.length) {
if (off >= b.length) {
throw new IllegalArgumentException("offset greater than length of array");
}
if (length < 0) {
if (len < 0) {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (length > (b.length - offset)) {
if (len > (b.length - off)) {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len);
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
if (bytesRead > 0) {
this.position += bytesRead;
if (statistics != null) {
statistics.incrementBytesRead(bytesRead);
}
}
return (int)bytesRead;
}
/**
@ -239,17 +123,8 @@ public class SeaweedInputStream extends FSInputStream {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
if (n >= fCursor - limit && n <= fCursor) { // within buffer
bCursor = (int) (n - (fCursor - limit));
return;
}
this.position = n;
// next read will read from here
fCursor = n;
//invalidate buffer
limit = 0;
bCursor = 0;
}
@Override
@ -257,20 +132,19 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
long currentPos = getPos();
if (currentPos == contentLength) {
if (this.position == contentLength) {
if (n > 0) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
}
long newPos = currentPos + n;
long newPos = this.position + n;
if (newPos < 0) {
newPos = 0;
n = newPos - currentPos;
n = newPos - this.position;
}
if (newPos > contentLength) {
newPos = contentLength;
n = newPos - currentPos;
n = newPos - this.position;
}
seek(newPos);
return n;
@ -321,7 +195,7 @@ public class SeaweedInputStream extends FSInputStream {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
return fCursor - limit + bCursor;
return position;
}
/**
@ -338,7 +212,6 @@ public class SeaweedInputStream extends FSInputStream {
@Override
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
}
/**

View File

@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import seaweedfs.client.ByteBufferPool;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
import seaweedfs.client.SeaweedWrite;
@ -16,14 +17,10 @@ import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
@ -37,16 +34,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private FilerProto.Entry.Builder entry;
private final FilerProto.Entry.Builder entry;
private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
private long position;
private boolean closed;
private boolean supportFlush = true;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private byte[] buffer;
private int bufferIndex;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private ByteBuffer buffer;
private long outputIndex;
private String replication = "000";
public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
@ -59,8 +56,8 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.buffer = new byte[bufferSize];
this.bufferIndex = 0;
this.buffer = ByteBufferPool.request(bufferSize);
this.outputIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
@ -102,25 +99,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw new IndexOutOfBoundsException();
}
// System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
writeCurrentBufferToService();
currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else {
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
if (numberOfBytesToWrite < writableBytes) {
buffer.put(data, currentOffset, numberOfBytesToWrite);
outputIndex += numberOfBytesToWrite;
break;
}
writableBytes = bufferSize - bufferIndex;
// System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ")");
buffer.put(data, currentOffset, writableBytes);
outputIndex += writableBytes;
currentOffset += writableBytes;
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position();
}
}
/**
@ -199,8 +200,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
threadExecutor.shutdown();
} finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
ByteBufferPool.release(buffer);
buffer = null;
bufferIndex = 0;
outputIndex = 0;
closed = true;
writeOperations.clear();
if (!threadExecutor.isShutdown()) {
@ -210,35 +212,17 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
}
private synchronized void writeCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
if (buffer.position() == 0) {
return;
}
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
buffer = new byte[bufferSize];
bufferIndex = 0;
final long offset = position;
buffer.flip();
int bytesLength = buffer.limit() - buffer.position();
SeaweedWrite.writeData(entry, replication, filerGrpcClient, position, buffer.array(), buffer.position(), buffer.limit());
// System.out.println(path + " saved [" + (position) + "," + ((position) + bytesLength) + ")");
position += bytesLength;
buffer.clear();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// originally: client.append(path, offset, bytes, 0, bytesLength);
SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
return null;
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private void waitForTaskToComplete() throws IOException {

111
test/s3/basic/basic_test.go Normal file
View File

@ -0,0 +1,111 @@
package basic
import (
"fmt"
"os"
"strings"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
var (
svc *s3.S3
)
func init() {
// Initialize a session in us-west-2 that the SDK will use to load
// credentials from the shared credentials file ~/.aws/credentials.
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-west-2"),
Endpoint: aws.String("localhost:8333"),
DisableSSL: aws.Bool(true),
})
if err != nil {
exitErrorf("create session, %v", err)
}
// Create S3 service client
svc = s3.New(sess)
}
func TestCreateBucket(t *testing.T) {
input := &s3.CreateBucketInput{
Bucket: aws.String("theBucket"),
}
result, err := svc.CreateBucket(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyExists:
fmt.Println(s3.ErrCodeBucketAlreadyExists, aerr.Error())
case s3.ErrCodeBucketAlreadyOwnedByYou:
fmt.Println(s3.ErrCodeBucketAlreadyOwnedByYou, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
}
return
}
fmt.Println(result)
}
func TestListBuckets(t *testing.T) {
input := &s3.PutObjectInput{
ACL: aws.String("authenticated-read"),
Body: aws.ReadSeekCloser(strings.NewReader("filetoupload")),
Bucket: aws.String("theBucket"),
Key: aws.String("exampleobject"),
}
result, err := svc.PutObject(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
fmt.Println(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(err.Error())
}
return
}
fmt.Println(result)
}
func TestPutObject(t *testing.T) {
result, err := svc.ListBuckets(nil)
if err != nil {
exitErrorf("Unable to list buckets, %v", err)
}
fmt.Println("Buckets:")
for _, b := range result.Buckets {
fmt.Printf("* %s created on %s\n",
aws.StringValue(b.Name), aws.TimeValue(b.CreationDate))
}
}
func exitErrorf(msg string, args ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", args...)
os.Exit(1)
}

View File

@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
v, err := storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool {
// remove the old data
v.Destroy()
// recreate an empty volume
v, err = storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
v, err = storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true

View File

@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@ -40,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid,
v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid,
storage.NeedleMapInMemory, nil, nil, preallocate, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)

View File

@ -43,7 +43,7 @@ var cmdDownload = &Command{
func runDownload(cmd *Command, args []string) bool {
for _, fid := range args {
if e := downloadToFile(*d.server, fid, *d.dir); e != nil {
if e := downloadToFile(*d.server, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}

View File

@ -198,7 +198,7 @@ func runExport(cmd *Command, args []string) bool {
needleMap := needle_map.NewMemDb()
defer needleMap.Close()
if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
if err := needleMap.LoadFromIdx(path.Join(util.ResolvePath(*export.dir), fileName+".idx")); err != nil {
glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
@ -211,7 +211,7 @@ func runExport(cmd *Command, args []string) bool {
fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\n")
}
err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner)
err = storage.ScanVolumeFile(util.ResolvePath(*export.dir), *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner)
if err != nil && err != io.EOF {
glog.Fatalf("Export Volume File [ERROR] %s\n", err)
}

View File

@ -100,7 +100,7 @@ func (fo *FilerOptions) startFiler() {
defaultLevelDbDirectory := "./filerldb2"
if fo.defaultLevelDbDirectory != nil {
defaultLevelDbDirectory = *fo.defaultLevelDbDirectory + "/filerldb2"
defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
}
var peers []string

View File

@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@ -67,7 +68,7 @@ func runFix(cmd *Command, args []string) bool {
if *fixVolumeCollection != "" {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
indexFileName := path.Join(util.ResolvePath(*fixVolumePath), baseFileName+".idx")
nm := needle_map.NewMemDb()
defer nm.Close()
@ -77,7 +78,7 @@ func runFix(cmd *Command, args []string) bool {
nm: nm,
}
if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
if err := storage.ScanVolumeFile(util.ResolvePath(*fixVolumePath), *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
glog.Fatalf("scan .dat File: %v", err)
os.Remove(indexFileName)
}

View File

@ -8,10 +8,11 @@ import (
"strings"
"github.com/chrislusf/raft/protobuf"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@ -85,7 +86,7 @@ func runMaster(cmd *Command, args []string) bool {
runtime.GOMAXPROCS(runtime.NumCPU())
grace.SetupProfiling(*masterCpuProfile, *masterMemProfile)
if err := util.TestFolderWritable(*m.metaFolder); err != nil {
if err := util.TestFolderWritable(util.ResolvePath(*m.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
}
@ -118,7 +119,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
// start raftServer
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, 5)
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
}

View File

@ -13,6 +13,9 @@ import (
"strings"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
@ -20,8 +23,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
func runMount(cmd *Command, args []string) bool {
@ -68,7 +69,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
filerMountRootPath := *option.filerMountRootPath
dir := *option.dir
dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
util.LoadConfiguration("security", false)
@ -97,6 +98,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountMode = os.ModeDir | fileInfo.Mode()
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
} else {
fmt.Printf("can not stat %s\n", dir)
return false
}
if uid == 0 {

View File

@ -176,7 +176,7 @@ func runServer(cmd *Command, args []string) bool {
if *masterOptions.metaFolder == "" {
*masterOptions.metaFolder = folders[0]
}
if err := util.TestFolderWritable(*masterOptions.metaFolder); err != nil {
if err := util.TestFolderWritable(util.ResolvePath(*masterOptions.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterOptions.metaFolder, err)
}
filerOptions.defaultLevelDbDirectory = masterOptions.metaFolder

View File

@ -69,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if *upload.dir == "" {
return false
}
filepath.Walk(*upload.dir, func(path string, info os.FileInfo, err error) error {
filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
if err == nil {
if !info.IsDir() {
if *upload.include != "" {

View File

@ -117,7 +117,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
for _, folder := range v.folders {
if err := util.TestFolderWritable(folder); err != nil {
if err := util.TestFolderWritable(util.ResolvePath(folder)); err != nil {
glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
}
}

View File

@ -110,7 +110,7 @@ func (wo *WebDavOption) startWebDav() bool {
Uid: uid,
Gid: gid,
Cipher: cipher,
CacheDir: *wo.cacheDir,
CacheDir: util.ResolvePath(*wo.cacheDir),
CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {

View File

@ -26,7 +26,7 @@ var (
)
type Filer struct {
store *FilerStoreWrapper
Store *FilerStoreWrapper
directoryCache *ccache.Cache
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
@ -38,9 +38,11 @@ type Filer struct {
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
MetaAggregator *MetaAggregator
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
@ -56,12 +58,23 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string
return f
}
func (f *Filer) AggregateFromPeers(self string, filers []string) {
// set peers
if len(filers) == 0 {
filers = append(filers, self)
}
f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
f.MetaAggregator.StartLoopSubscribe(f, self)
}
func (f *Filer) SetStore(store FilerStore) {
f.store = NewFilerStoreWrapper(store)
f.Store = NewFilerStoreWrapper(store)
}
func (f *Filer) GetStore() (store FilerStore) {
return f.store
return f.Store
}
func (f *Filer) DisableDirectoryCache() {
@ -77,15 +90,15 @@ func (fs *Filer) KeepConnectedToMaster() {
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.store.BeginTransaction(ctx)
return f.Store.BeginTransaction(ctx)
}
func (f *Filer) CommitTransaction(ctx context.Context) error {
return f.store.CommitTransaction(ctx)
return f.Store.CommitTransaction(ctx)
}
func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.store.RollbackTransaction(ctx)
return f.Store.RollbackTransaction(ctx)
}
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error {
@ -137,7 +150,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
mkdirErr := f.store.InsertEntry(ctx, dirEntry)
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
if mkdirErr != nil {
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
@ -180,7 +193,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
glog.V(4).Infof("CreateEntry %s: old entry: %v exclusive:%v", entry.FullPath, oldEntry, o_excl)
if oldEntry == nil {
if err := f.store.InsertEntry(ctx, entry); err != nil {
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
@ -216,7 +229,7 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
return fmt.Errorf("existing %s is a file", entry.FullPath)
}
}
return f.store.UpdateEntry(ctx, entry)
return f.Store.UpdateEntry(ctx, entry)
}
func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, err error) {
@ -235,10 +248,10 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
},
}, nil
}
entry, err = f.store.FindEntry(ctx, p)
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.store.DeleteEntry(ctx, p.Child(entry.Name()))
f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
return nil, filer_pb.ErrNotFound
}
}
@ -264,7 +277,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, lastFileName string, err error) {
listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
listedEntries, listErr := f.Store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
if listErr != nil {
return listedEntries, expiredCount, "", listErr
}
@ -272,7 +285,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
lastFileName = entry.Name()
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.store.DeleteEntry(ctx, p.Child(entry.Name()))
f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
expiredCount++
continue
}
@ -323,5 +336,5 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
func (f *Filer) Shutdown() {
f.LocalMetaLogBuffer.Shutdown()
f.store.Shutdown()
f.Store.Shutdown()
}

View File

@ -74,9 +74,9 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
if sub.IsDirectory() {
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
f.cacheDelDirectory(string(sub.FullPath))
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster)
chunks = append(chunks, dirChunks...)
} else {
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster)
chunks = append(chunks, sub.Chunks...)
}
if err != nil && !ignoreRecursiveError {
@ -91,10 +91,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
return chunks, nil
}
@ -102,13 +104,14 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
if entry.IsDirectory() {
f.cacheDelDirectory(string(entry.FullPath))
}
} else {
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
}
return nil
}

View File

@ -78,8 +78,13 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
// startTime.Second(), startTime.Nanosecond(),
)
for {
if err := f.appendToFile(targetFile, buf); err != nil {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
glog.V(1).Infof("log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
}
}
}

View File

@ -29,8 +29,13 @@ type FilerStore interface {
Shutdown()
}
type FilerLocalStore interface {
UpdateOffset(filer string, lastTsNs int64) error
ReadOffset(filer string) (lastTsNs int64, err error)
}
type FilerStoreWrapper struct {
actualStore FilerStore
ActualStore FilerStore
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
@ -38,48 +43,48 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
return innerStore
}
return &FilerStoreWrapper{
actualStore: store,
ActualStore: store,
}
}
func (fsw *FilerStoreWrapper) GetName() string {
return fsw.actualStore.GetName()
return fsw.ActualStore.GetName()
}
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
return fsw.actualStore.Initialize(configuration, prefix)
return fsw.ActualStore.Initialize(configuration, prefix)
}
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
return fsw.actualStore.InsertEntry(ctx, entry)
return fsw.ActualStore.InsertEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
return fsw.actualStore.UpdateEntry(ctx, entry)
return fsw.ActualStore.UpdateEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
entry, err = fsw.actualStore.FindEntry(ctx, fp)
entry, err = fsw.ActualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
@ -88,33 +93,33 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
}
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
return fsw.actualStore.DeleteEntry(ctx, fp)
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
return fsw.actualStore.DeleteFolderChildren(ctx, fp)
return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
}
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
@ -125,17 +130,17 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}
func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
return fsw.actualStore.BeginTransaction(ctx)
return fsw.ActualStore.BeginTransaction(ctx)
}
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
return fsw.actualStore.CommitTransaction(ctx)
return fsw.ActualStore.CommitTransaction(ctx)
}
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
return fsw.actualStore.RollbackTransaction(ctx)
return fsw.ActualStore.RollbackTransaction(ctx)
}
func (fsw *FilerStoreWrapper) Shutdown() {
fsw.actualStore.Shutdown()
fsw.ActualStore.Shutdown()
}

View File

@ -0,0 +1,43 @@
package leveldb
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
_ = filer2.FilerLocalStore(&LevelDB2Store{})
)
func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
value := make([]byte, 8)
util.Uint64toBytes(value, uint64(lastTsNs))
err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
if err != nil {
return fmt.Errorf("UpdateOffset %s : %v", filer, err)
}
println("UpdateOffset", filer, "lastTsNs", lastTsNs)
return nil
}
func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
if err != nil {
return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
}
lastTsNs = int64(util.BytesToUint64(value))
println("ReadOffset", filer, "lastTsNs", lastTsNs)
return
}

View File

@ -37,13 +37,48 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
return t
}
func (ma *MetaAggregator) StartLoopSubscribe(lastTsNs int64) {
func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
for _, filer := range ma.filers {
go ma.subscribeToOneFiler(filer, lastTsNs)
go ma.subscribeToOneFiler(f, self, filer)
}
}
func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
changesSinceLastPersist := 0
lastTsNs := int64(0)
MaxChangeLimit := 100
if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
if self != filer {
if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
lastTsNs = prevTsNs
}
glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
if err := Replay(f.Store.ActualStore, event); err != nil {
glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
return
}
changesSinceLastPersist++
if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
lastPersistTime = time.Now()
changesSinceLastPersist = 0
} else {
glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
}
}
}
} else {
glog.V(0).Infof("skipping following self: %v", self)
}
}
processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
data, err := proto.Marshal(event)
@ -54,13 +89,16 @@ func (ma *MetaAggregator) subscribeToOneFiler(filer string, lastTsNs int64) {
dir := event.Directory
// println("received meta change", dir, "size", len(data))
ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
return nil
}
for {
err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
ClientName: "filer",
ClientName: "filer:" + self,
PathPrefix: "/",
SinceNs: lastTsNs,
})

View File

@ -0,0 +1,37 @@
package filer2
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
var oldPath util.FullPath
var newEntry *Entry
if message.OldEntry != nil {
oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
if err := filerStore.DeleteEntry(context.Background(), oldPath); err != nil {
return err
}
}
if message.NewEntry != nil {
dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
newEntry = FromPbEntry(dir, message.NewEntry)
if err := filerStore.InsertEntry(context.Background(), newEntry); err != nil {
return err
}
}
return nil
}

View File

@ -10,18 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
return nil
glog.V(0).Infof("synchronizing meta data ...")
filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
entry := filer2.FromPbEntry(string(parentPath), pbEntry)
if err := mc.InsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
}
})
return nil
}
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {

View File

@ -56,13 +56,13 @@ func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string,
}
if err := processEventFn(resp); err != nil {
return fmt.Errorf("process %v: %v", resp, err)
glog.Fatalf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
}
})
if err != nil {
glog.V(0).Infof("subscribing filer meta change: %v", err)
glog.Errorf("subscribing filer meta change: %v", err)
time.Sleep(time.Second)
}
}

View File

@ -10,11 +10,13 @@ import (
// https://github.com/bazil/fuse/issues/130
var _ = fs.NodeAccesser(&Dir{})
func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS
}
var _ = fs.NodeAccesser(&File{})
func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS
}

View File

@ -84,7 +84,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
},
}
cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress))[0:4]
cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, 0755)
@ -96,14 +96,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"))
startTime := time.Now()
if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
glog.V(0).Infof("failed to init meta cache: %v", err)
} else {
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
wfs.fsNodeCache = newFsCache(wfs.root)

View File

@ -18,7 +18,7 @@ type MockClient struct {
}
func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
n, originalSize, err := needle.CreateNeedleFromRequest(req, 1024*1024)
n, originalSize, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
if m.needleHandling != nil {
m.needleHandling(n, originalSize, err)
}
@ -101,7 +101,6 @@ func TestCreateNeedleFromRequest(t *testing.T) {
}
var textContent = `Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

View File

@ -205,8 +205,8 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
if postErr != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
return nil, postErr
glog.V(1).Infof("failing to upload to %s: %v", uploadUrl, postErr)
return nil, fmt.Errorf("failing to upload to %s: %v", uploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
@ -217,8 +217,8 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
}
resp, post_err := HttpClient.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
glog.V(1).Infof("failing to upload to %v: %v", uploadUrl, post_err)
return nil, fmt.Errorf("failing to upload to %v: %v", uploadUrl, post_err)
}
defer resp.Body.Close()

View File

@ -115,6 +115,11 @@ message FileChunk {
FileId source_fid = 8;
bytes cipher_key = 9;
bool is_compressed = 10;
bool is_chunk_manifest = 11; // content is a list of FileChunks
}
message FileChunkManifest {
repeated FileChunk chunks = 1;
}
message FileId {

File diff suppressed because it is too large Load Diff

View File

@ -91,7 +91,13 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(fileName string) err
return nil
}
func (iam *IdentityAccessManagement) isEnabled() bool {
return len(iam.identities) > 0
}
func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identity *Identity, cred *Credential, found bool) {
for _, ident := range iam.identities {
for _, cred := range ident.Credentials {
if cred.AccessKey == accessKey {
@ -104,7 +110,7 @@ func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identi
func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc {
if len(iam.identities) == 0 {
if iam.isEnabled() {
return f
}

View File

@ -274,7 +274,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.WriteHeader(http.StatusPartialContent)
// w.WriteHeader(http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
@ -315,7 +315,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if w.Header().Get("Content-Encoding") == "" {
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
w.WriteHeader(http.StatusPartialContent)
// w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return

View File

@ -37,10 +37,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
err = fs.metaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.metaAggregator.ListenersLock.Lock()
fs.metaAggregator.ListenersCond.Wait()
fs.metaAggregator.ListenersLock.Unlock()
err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
return true
}, eachLogEntryFn)
@ -63,6 +63,20 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok {
// println("reading from persisted logs ...")
processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
if err != nil {
return fmt.Errorf("reading from persisted logs: %v", err)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
}
// println("reading from in memory logs ...")
err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
@ -117,6 +131,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file
EventNotification: eventNotification,
TsNs: tsNs,
}
// println("sending", dirPath, entryName)
if err := stream.Send(message); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err

View File

@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"
@ -60,7 +59,6 @@ type FilerServer struct {
option *FilerOption
secret security.SigningKey
filer *filer2.Filer
metaAggregator *filer2.MetaAggregator
grpcDialOption grpc.DialOption
// notifying clients
@ -121,15 +119,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
// set peers
if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 {
glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers)
}
if len(option.Filers) == 0 {
option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port))
}
fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption)
fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano())
fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
fs.filer.LoadBuckets()

View File

@ -32,7 +32,7 @@ func TestMemoryUsage(t *testing.T) {
startTime := time.Now()
for i := 0; i < 10; i++ {
indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
indexFile, ie := os.OpenFile("../../../test/data/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil {
log.Fatalln(ie)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
const (
@ -52,7 +53,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i])
location := NewDiskLocation(util.ResolvePath(dirnames[i]), maxVolumeCounts[i], minFreeSpacePercents[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))

View File

@ -9,9 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
visitFn = func(path util.FullPath) (childDirectories []string, err error) {
fmt.Printf(" visit %v ...\n", path)
switch path {
@ -37,14 +35,11 @@ var (
return nil, nil
}
printMap = func(m map[string]*Node) {
for k := range m {
println(" >", k)
}
}
)
func TestBoundedTree(t *testing.T) {

View File

@ -54,6 +54,7 @@ func ungzipData(input []byte) ([]byte, error) {
}
var decoder, _ = zstd.NewReader(nil)
func unzstdData(input []byte) ([]byte, error) {
return decoder.DecodeAll(input, nil)
}

View File

@ -5,7 +5,7 @@ import (
)
var (
VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 84)
VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 85)
COMMIT = ""
)

View File

@ -3,6 +3,9 @@ package util
import (
"errors"
"os"
"os/user"
"path/filepath"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -63,3 +66,20 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
fileSize = fi.Size()
return
}
func ResolvePath(path string) string {
usr, _ := user.Current()
dir := usr.HomeDir
if path == "~" {
// In case of "~", which won't be caught by the "else if"
path = dir
} else if strings.HasPrefix(path, "~/") {
// Use strings.HasPrefix so we don't match paths like
// "/something/~/something/"
path = filepath.Join(dir, path[2:])
}
return path
}

View File

@ -145,13 +145,16 @@ func (m *LogBuffer) loopInterval() {
func (m *LogBuffer) copyToFlush() *dataToFlush {
if m.flushFn != nil && m.pos > 0 {
if m.pos > 0 {
// fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
d := &dataToFlush{
var d *dataToFlush
if m.flushFn != nil {
d = &dataToFlush{
startTime: m.startTime,
stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]),
}
}
// fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
m.pos = 0