diff --git a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java b/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java deleted file mode 100644 index 52c7ac09c..000000000 --- a/other/java/client/src/main/java/seaweedfs/client/ReadAheadInputStream.java +++ /dev/null @@ -1,404 +0,0 @@ -package seaweedfs.client; - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// package org.apache.spark.io; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * {@link InputStream} implementation which asynchronously reads ahead from the underlying input - * stream when specified amount of data has been read from the current buffer. It does it by - * maintaining two buffers - active buffer and read ahead buffer. Active buffer contains data - * which should be returned when a read() call is issued. The read ahead buffer is used to - * asynchronously read from the underlying input stream and once the current active buffer is - * exhausted, we flip the two buffers so that we can start reading from the read ahead buffer - * without being blocked in disk I/O. - */ -public class ReadAheadInputStream extends InputStream { - - private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); - - private ReentrantLock stateChangeLock = new ReentrantLock(); - - @GuardedBy("stateChangeLock") - private ByteBuffer activeBuffer; - - @GuardedBy("stateChangeLock") - private ByteBuffer readAheadBuffer; - - @GuardedBy("stateChangeLock") - private boolean endOfStream; - - @GuardedBy("stateChangeLock") - // true if async read is in progress - private boolean readInProgress; - - @GuardedBy("stateChangeLock") - // true if read is aborted due to an exception in reading from underlying input stream. - private boolean readAborted; - - @GuardedBy("stateChangeLock") - private Throwable readException; - - @GuardedBy("stateChangeLock") - // whether the close method is called. - private boolean isClosed; - - @GuardedBy("stateChangeLock") - // true when the close method will close the underlying input stream. This is valid only if - // `isClosed` is true. - private boolean isUnderlyingInputStreamBeingClosed; - - @GuardedBy("stateChangeLock") - // whether there is a read ahead task running, - private boolean isReading; - - // whether there is a reader waiting for data. - private AtomicBoolean isWaiting = new AtomicBoolean(false); - - private final InputStream underlyingInputStream; - - private final ExecutorService executorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("read-ahread").build() - ); - - private final Condition asyncReadComplete = stateChangeLock.newCondition(); - - private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); - - /** - * Creates a ReadAheadInputStream with the specified buffer size and read-ahead - * threshold - * - * @param inputStream The underlying input stream. - * @param bufferSizeInBytes The buffer size. - */ - public ReadAheadInputStream( - InputStream inputStream, int bufferSizeInBytes) { - Preconditions.checkArgument(bufferSizeInBytes > 0, - "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); - activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); - readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); - this.underlyingInputStream = inputStream; - activeBuffer.flip(); - readAheadBuffer.flip(); - } - - private boolean isEndOfStream() { - return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream); - } - - private void checkReadException() throws IOException { - if (readAborted) { - Throwables.propagateIfPossible(readException, IOException.class); - throw new IOException(readException); - } - } - - /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */ - private void readAsync() throws IOException { - stateChangeLock.lock(); - final byte[] arr = readAheadBuffer.array(); - try { - if (endOfStream || readInProgress) { - return; - } - checkReadException(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - readInProgress = true; - } finally { - stateChangeLock.unlock(); - } - executorService.execute(() -> { - stateChangeLock.lock(); - try { - if (isClosed) { - readInProgress = false; - return; - } - // Flip this so that the close method will not close the underlying input stream when we - // are reading. - isReading = true; - } finally { - stateChangeLock.unlock(); - } - - // Please note that it is safe to release the lock and read into the read ahead buffer - // because either of following two conditions will hold - 1. The active buffer has - // data available to read so the reader will not read from the read ahead buffer. - // 2. This is the first time read is called or the active buffer is exhausted, - // in that case the reader waits for this async read to complete. - // So there is no race condition in both the situations. - int read = 0; - int off = 0, len = arr.length; - Throwable exception = null; - try { - // try to fill the read ahead buffer. - // if a reader is waiting, possibly return early. - do { - read = underlyingInputStream.read(arr, off, len); - if (read <= 0) break; - off += read; - len -= read; - } while (len > 0 && !isWaiting.get()); - } catch (Throwable ex) { - exception = ex; - if (ex instanceof Error) { - // `readException` may not be reported to the user. Rethrow Error to make sure at least - // The user can see Error in UncaughtExceptionHandler. - throw (Error) ex; - } - } finally { - stateChangeLock.lock(); - readAheadBuffer.limit(off); - if (read < 0 || (exception instanceof EOFException)) { - endOfStream = true; - } else if (exception != null) { - readAborted = true; - readException = exception; - } - readInProgress = false; - signalAsyncReadComplete(); - stateChangeLock.unlock(); - closeUnderlyingInputStreamIfNecessary(); - } - }); - } - - private void closeUnderlyingInputStreamIfNecessary() { - boolean needToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - isReading = false; - if (isClosed && !isUnderlyingInputStreamBeingClosed) { - // close method cannot close underlyingInputStream because we were reading. - needToCloseUnderlyingInputStream = true; - } - } finally { - stateChangeLock.unlock(); - } - if (needToCloseUnderlyingInputStream) { - try { - underlyingInputStream.close(); - } catch (IOException e) { - logger.warn(e.getMessage(), e); - } - } - } - - private void signalAsyncReadComplete() { - stateChangeLock.lock(); - try { - asyncReadComplete.signalAll(); - } finally { - stateChangeLock.unlock(); - } - } - - private void waitForAsyncReadComplete() throws IOException { - stateChangeLock.lock(); - isWaiting.set(true); - try { - // There is only one reader, and one writer, so the writer should signal only once, - // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. - while (readInProgress) { - asyncReadComplete.await(); - } - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - isWaiting.set(false); - stateChangeLock.unlock(); - } - checkReadException(); - } - - @Override - public int read() throws IOException { - if (activeBuffer.hasRemaining()) { - // short path - just get one byte. - return activeBuffer.get() & 0xFF; - } else { - byte[] oneByteArray = oneByte.get(); - return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; - } - } - - @Override - public int read(byte[] b, int offset, int len) throws IOException { - if (offset < 0 || len < 0 || len > b.length - offset) { - throw new IndexOutOfBoundsException(); - } - if (len == 0) { - return 0; - } - - if (!activeBuffer.hasRemaining()) { - // No remaining in active buffer - lock and switch to write ahead buffer. - stateChangeLock.lock(); - try { - waitForAsyncReadComplete(); - if (!readAheadBuffer.hasRemaining()) { - // The first read. - readAsync(); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return -1; - } - } - // Swap the newly read read ahead buffer in place of empty active buffer. - swapBuffers(); - // After swapping buffers, trigger another async read for read ahead buffer. - readAsync(); - } finally { - stateChangeLock.unlock(); - } - } - len = Math.min(len, activeBuffer.remaining()); - activeBuffer.get(b, offset, len); - - return len; - } - - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; - } - - @Override - public int available() throws IOException { - stateChangeLock.lock(); - // Make sure we have no integer overflow. - try { - return (int) Math.min((long) Integer.MAX_VALUE, - (long) activeBuffer.remaining() + readAheadBuffer.remaining()); - } finally { - stateChangeLock.unlock(); - } - } - - @Override - public long skip(long n) throws IOException { - if (n <= 0L) { - return 0L; - } - if (n <= activeBuffer.remaining()) { - // Only skipping from active buffer is sufficient - activeBuffer.position((int) n + activeBuffer.position()); - return n; - } - stateChangeLock.lock(); - long skipped; - try { - skipped = skipInternal(n); - } finally { - stateChangeLock.unlock(); - } - return skipped; - } - - /** - * Internal skip function which should be called only from skip() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private long skipInternal(long n) throws IOException { - assert (stateChangeLock.isLocked()); - waitForAsyncReadComplete(); - if (isEndOfStream()) { - return 0; - } - if (available() >= n) { - // we can skip from the internal buffers - int toSkip = (int) n; - // We need to skip from both active buffer and read ahead buffer - toSkip -= activeBuffer.remaining(); - assert(toSkip > 0); // skipping from activeBuffer already handled. - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(toSkip + readAheadBuffer.position()); - swapBuffers(); - // Trigger async read to emptied read ahead buffer. - readAsync(); - return n; - } else { - int skippedBytes = available(); - long toSkip = n - skippedBytes; - activeBuffer.position(0); - activeBuffer.flip(); - readAheadBuffer.position(0); - readAheadBuffer.flip(); - long skippedFromInputStream = underlyingInputStream.skip(toSkip); - readAsync(); - return skippedBytes + skippedFromInputStream; - } - } - - @Override - public void close() throws IOException { - boolean isSafeToCloseUnderlyingInputStream = false; - stateChangeLock.lock(); - try { - if (isClosed) { - return; - } - isClosed = true; - if (!isReading) { - // Nobody is reading, so we can close the underlying input stream in this method. - isSafeToCloseUnderlyingInputStream = true; - // Flip this to make sure the read ahead task will not close the underlying input stream. - isUnderlyingInputStreamBeingClosed = true; - } - } finally { - stateChangeLock.unlock(); - } - - try { - executorService.shutdownNow(); - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - InterruptedIOException iio = new InterruptedIOException(e.getMessage()); - iio.initCause(e); - throw iio; - } finally { - if (isSafeToCloseUnderlyingInputStream) { - underlyingInputStream.close(); - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 836bb4db5..fd8877806 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; -import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 836bb4db5..fd8877806 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -11,7 +11,6 @@ import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import seaweedfs.client.FilerProto; -import seaweedfs.client.ReadAheadInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -77,7 +76,7 @@ public class SeaweedFileSystem extends FileSystem { try { FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(new ReadAheadInputStream(inputStream, 16 * 1024 * 1024)); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 16 * 1024 * 1024)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null;