Merge pull request #1968 from hilimd/master

HCFS over ftp
This commit is contained in:
Chris Lu 2021-04-06 00:50:31 -07:00 committed by GitHub
commit 100ed77387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1867 additions and 0 deletions

View File

@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hdfs-over-ftp</groupId>
<artifactId>hdfs-over-ftp</artifactId>
<version>1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.ftpserver</groupId>
<artifactId>ftpserver-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop3-client</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<verbose />
<bootclasspath>${java.home}/lib/rt.jar</bootclasspath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.seaweed.ftp.ApplicationServer</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
<manifestEntries>
<Class-Path>./</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,14 @@
package org.apache.hadoop.seaweed.ftp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ApplicationServer {
public static void main(String[] args) {
SpringApplication.run(ApplicationServer.class, args);
}
}

View File

@ -0,0 +1,27 @@
package org.apache.hadoop.seaweed.ftp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.pathMapping("/")
.select()
.apis(RequestHandlerSelectors.basePackage("org.apache.hadoop.seaweed.ftp"))
.paths(PathSelectors.any())
.build().apiInfo(new ApiInfoBuilder()
.title("FTP API Doc")
.version("1.0")
.build());
}
}

View File

@ -0,0 +1,71 @@
package org.apache.hadoop.seaweed.ftp.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.hadoop.seaweed.ftp.service.HFtpService;
import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/manager")
@Api(tags = "FTP操作管理")
public class FtpManagerController {
private static Logger log = Logger.getLogger(FtpManagerController.class);
@Autowired
private HFtpService hdfsOverFtpServer;
@GetMapping("/status")
@ApiOperation("查看FTP服务状态")
public Result status() {
Map map = new HashMap<>();
try {
boolean status = hdfsOverFtpServer.statusServer();
map.put("is_running", status);
return new Result(true, map, "FTP 服务状态获取成功");
}catch (Exception e) {
log.error(e);
map.put("is_running", false);
return new Result(true, map, "FTP 服务状态获取成功");
}
}
@PutMapping("/start")
@ApiOperation("启动FTP服务")
public Result start() {
try {
boolean status = hdfsOverFtpServer.statusServer();
if(!status) {
hdfsOverFtpServer.startServer();
}
return new Result(true, "FTP 服务启动成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "FTP 服务启动失败");
}
}
@PutMapping("/stop")
@ApiOperation("停止FTP服务")
public Result stop() {
try {
boolean status = hdfsOverFtpServer.statusServer();
if(status) {
hdfsOverFtpServer.stopServer();
}
return new Result(true, "FTP 服务停止成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "FTP 服务停止失败");
}
}
}

View File

@ -0,0 +1,98 @@
package org.apache.hadoop.seaweed.ftp.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.ftpserver.ftplet.User;
import org.apache.ftpserver.usermanager.Md5PasswordEncryptor;
import org.apache.ftpserver.usermanager.UserFactory;
import org.apache.hadoop.seaweed.ftp.controller.vo.FtpUser;
import org.apache.hadoop.seaweed.ftp.controller.vo.Result;
import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
import org.apache.log4j.Logger;
import org.springframework.web.bind.annotation.*;
import java.io.File;
@RestController
@RequestMapping("/user")
@Api(tags = "FTP用户管理")
public class UserController {
private static Logger log = Logger.getLogger(UserController.class);
/***
* {
* "name": "test",
* "password": "test",
* "homeDirectory": "/buckets/test/"
* }
* @param ftpUser
* @return
*/
@PostMapping("/add")
@ApiOperation("新增/编辑用户")
public Result add(@RequestBody FtpUser ftpUser) {
try {
HdfsUserManager userManagerFactory = new HdfsUserManager();
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
userManagerFactory.setPasswordEncryptor(new Md5PasswordEncryptor());
UserFactory userFactory = new UserFactory();
userFactory.setHomeDirectory(ftpUser.getHomeDirectory());
userFactory.setName(ftpUser.getName());
userFactory.setPassword(ftpUser.getPassword());
userFactory.setEnabled(ftpUser.isEnabled());
userFactory.setMaxIdleTime(ftpUser.getMaxIdleTime());
User user = userFactory.createUser();
userManagerFactory.save(user, ftpUser.isRenamePush());
return new Result(true, "新建用户成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "新建用户失败");
}
}
@DeleteMapping("/delete/{user}")
@ApiOperation("删除用户")
public Result delete(@PathVariable(value = "user") String user) {
try {
HdfsUserManager userManagerFactory = new HdfsUserManager();
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
userManagerFactory.delete(user);
return new Result(true, "删除用户成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "删除用户失败");
}
}
@GetMapping("/show/{userName}")
@ApiOperation("查看用户")
public Result show(@PathVariable(value = "userName") String userName) {
try {
HdfsUserManager userManagerFactory = new HdfsUserManager();
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
User user = userManagerFactory.getUserByName(userName);
FtpUser ftpUser = new FtpUser(user.getHomeDirectory(), user.getPassword(), user.getEnabled(), user.getName(), user.getMaxIdleTime(), HdfsUserManager.getUserRenamePush(userName));
return new Result(true, ftpUser, "获取用户信息成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "获取用户信息失败");
}
}
@GetMapping("/list")
@ApiOperation("列举用户")
public Result list() {
try {
HdfsUserManager userManagerFactory = new HdfsUserManager();
userManagerFactory.setFile(new File(System.getProperty("user.dir") + File.separator + "users.properties"));
String[] allUserNames = userManagerFactory.getAllUserNames();
return new Result(true, allUserNames, "列举用户成功");
}catch (Exception e) {
log.error(e);
return new Result(false, "列举用户失败");
}
}
}

View File

@ -0,0 +1,71 @@
package org.apache.hadoop.seaweed.ftp.controller.vo;
public class FtpUser {
private String homeDirectory;
private String password;
private boolean enabled;
private String name;
private int maxIdleTime;
private boolean renamePush;
public FtpUser() {
}
public FtpUser(String homeDirectory, String password, boolean enabled, String name, int maxIdleTime, boolean renamePush) {
this.homeDirectory = homeDirectory;
this.password = password;
this.enabled = enabled;
this.name = name;
this.maxIdleTime = maxIdleTime;
this.renamePush = renamePush;
}
public String getHomeDirectory() {
return homeDirectory;
}
public void setHomeDirectory(String homeDirectory) {
this.homeDirectory = homeDirectory;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getMaxIdleTime() {
return maxIdleTime;
}
public void setMaxIdleTime(int maxIdleTime) {
this.maxIdleTime = maxIdleTime;
}
public boolean isRenamePush() {
return renamePush;
}
public void setRenamePush(boolean renamePush) {
this.renamePush = renamePush;
}
}

View File

@ -0,0 +1,43 @@
package org.apache.hadoop.seaweed.ftp.controller.vo;
public class Result {
private boolean status;
private Object data;
private String message;
public Result(boolean status, String message) {
this.status = status;
this.message = message;
}
public Result(boolean status, Object data, String message) {
this.status = status;
this.message = message;
this.data = data;
}
public boolean isStatus() {
return status;
}
public void setStatus(boolean status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}

View File

@ -0,0 +1,102 @@
package org.apache.hadoop.seaweed.ftp.service;
import org.apache.ftpserver.DataConnectionConfiguration;
import org.apache.ftpserver.DataConnectionConfigurationFactory;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.command.CommandFactoryFactory;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsFileSystemManager;
import org.apache.hadoop.seaweed.ftp.service.filesystem.HdfsOverFtpSystem;
import org.apache.hadoop.seaweed.ftp.users.HdfsUserManager;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
/**
* reference: https://github.com/iponweb/hdfs-over-ftp
*/
@Component
public class HFtpService {
private static Logger log = Logger.getLogger(HFtpService.class);
@Value("${ftp.port}")
private int port = 0;
@Value("${ftp.passive-address}")
private String passiveAddress;
@Value("${ftp.passive-ports}")
private String passivePorts;
@Value("${hdfs.uri}")
private String hdfsUri;
@Value("${seaweedFs.enable}")
private boolean seaweedFsEnable;
@Value("${seaweedFs.access}")
private String seaweedFsAccess;
@Value("${seaweedFs.replication}")
private String seaweedFsReplication;
private FtpServer ftpServer = null;
public void startServer() throws Exception {
log.info("Starting HDFS-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
HdfsOverFtpSystem.setHdfsUri(hdfsUri);
HdfsOverFtpSystem.setSeaweedFsEnable(seaweedFsEnable);
HdfsOverFtpSystem.setSeaweedFsAccess(seaweedFsAccess);
HdfsOverFtpSystem.setSeaweedFsReplication(seaweedFsReplication);
FtpServerFactory server = new FtpServerFactory();
server.setFileSystem(new HdfsFileSystemManager());
ListenerFactory factory = new ListenerFactory();
factory.setPort(port);
DataConnectionConfigurationFactory dccFactory = new DataConnectionConfigurationFactory();
dccFactory.setPassiveAddress("0.0.0.0");
dccFactory.setPassivePorts(passivePorts);
dccFactory.setPassiveExternalAddress(passiveAddress);
DataConnectionConfiguration dcc = dccFactory.createDataConnectionConfiguration();
factory.setDataConnectionConfiguration(dcc);
server.addListener("default", factory.createListener());
HdfsUserManager userManager = new HdfsUserManager();
final File file = loadResource("/users.properties");
userManager.setFile(file);
server.setUserManager(userManager);
CommandFactoryFactory cmFact = new CommandFactoryFactory();
cmFact.setUseDefaultCommands(true);
server.setCommandFactory(cmFact.createCommandFactory());
// start the server
ftpServer = server.createServer();
ftpServer.start();
}
public void stopServer() {
log.info("Stopping Hdfs-Over-Ftp server. port: " + port + " passive-address: " + passiveAddress + " passive-ports: " + passivePorts + " hdfs-uri: " + hdfsUri);
ftpServer.stop();
}
public boolean statusServer() {
try {
return !ftpServer.isStopped();
}catch (Exception e) {
return false;
}
}
private static File loadResource(String resourceName) {
return new File(System.getProperty("user.dir") + resourceName);
}
}

View File

@ -0,0 +1,333 @@
package org.apache.hadoop.seaweed.ftp.service.filesystem;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.seaweed.ftp.users.HdfsUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* This class implements all actions to HDFS
*/
public class HdfsFileObject implements FtpFile {
private final Logger log = LoggerFactory.getLogger(HdfsFileObject.class);
private Path homePath;
private Path path;
private Path fullPath;
private HdfsUser user;
/**
* Constructs HdfsFileObject from path
*
* @param path path to represent object
* @param user accessor of the object
*/
public HdfsFileObject(String homePath, String path, User user) {
this.homePath = new Path(homePath);
this.path = new Path(path);
this.fullPath = new Path(homePath + path);
this.user = (HdfsUser) user;
}
public String getAbsolutePath() {
// strip the last '/' if necessary
String fullName = path.toString();
int filelen = fullName.length();
if ((filelen != 1) && (fullName.charAt(filelen - 1) == '/')) {
fullName = fullName.substring(0, filelen - 1);
}
return fullName;
}
public String getName() {
return path.getName();
}
/**
* HDFS has no hidden objects
*
* @return always false
*/
public boolean isHidden() {
return false;
}
/**
* Checks if the object is a directory
*
* @return true if the object is a directory
*/
public boolean isDirectory() {
try {
log.debug("is directory? : " + fullPath);
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fs = dfs.getFileStatus(fullPath);
return fs.isDir();
} catch (IOException e) {
log.debug(fullPath + " is not dir", e);
return false;
}
}
/**
* Checks if the object is a file
*
* @return true if the object is a file
*/
public boolean isFile() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
return dfs.isFile(fullPath);
} catch (IOException e) {
log.debug(fullPath + " is not file", e);
return false;
}
}
/**
* Checks if the object does exist
*
* @return true if the object does exist
*/
public boolean doesExist() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
dfs.getFileStatus(fullPath);
return true;
} catch (IOException e) {
// log.debug(path + " does not exist", e);
return false;
}
}
public boolean isReadable() {
return true;
}
public boolean isWritable() {
return true;
}
public boolean isRemovable() {
return true;
}
/**
* Get owner of the object
*
* @return owner of the object
*/
public String getOwnerName() {
return "root";
/*
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fs = dfs.getFileStatus(fullPath);
String owner = fs.getOwner();
if(owner.length() == 0) {
return "root";
}
return owner;
} catch (IOException e) {
e.printStackTrace();
return null;
}
*/
}
/**
* Get group of the object
*
* @return group of the object
*/
public String getGroupName() {
return "root";
/*
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fs = dfs.getFileStatus(fullPath);
String group = fs.getGroup();
if(group.length() == 0) {
return "root";
}
return group;
} catch (IOException e) {
e.printStackTrace();
return null;
}
*/
}
/**
* Get link count
*
* @return 3 is for a directory and 1 is for a file
*/
public int getLinkCount() {
return isDirectory() ? 3 : 1;
}
/**
* Get last modification date
*
* @return last modification date as a long
*/
public long getLastModified() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fs = dfs.getFileStatus(fullPath);
return fs.getModificationTime();
} catch (IOException e) {
e.printStackTrace();
return 0;
}
}
public boolean setLastModified(long l) {
return false;
}
/**
* Get a size of the object
*
* @return size of the object in bytes
*/
public long getSize() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fs = dfs.getFileStatus(fullPath);
log.debug("getSize(): " + fullPath + " : " + fs.getLen());
return fs.getLen();
} catch (IOException e) {
e.printStackTrace();
return 0;
}
}
public Object getPhysicalFile() {
return null;
}
/**
* Create a new dir from the object
*
* @return true if dir is created
*/
public boolean mkdir() {
try {
FileSystem fs = HdfsOverFtpSystem.getDfs();
fs.mkdirs(fullPath);
// fs.setOwner(path, user.getName(), user.getMainGroup());
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
/**
* Delete object from the HDFS filesystem
*
* @return true if the object is deleted
*/
public boolean delete() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
dfs.delete(fullPath, true);
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
public boolean move(FtpFile ftpFile) {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
dfs.rename(fullPath, new Path(fullPath.getParent() + File.separator + ftpFile.getName()));
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
/**
* List files of the directory
*
* @return List of files in the directory
*/
public List<FtpFile> listFiles() {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FileStatus fileStats[] = dfs.listStatus(fullPath);
// get the virtual name of the base directory
String virtualFileStr = getAbsolutePath();
if (virtualFileStr.charAt(virtualFileStr.length() - 1) != '/') {
virtualFileStr += '/';
}
FtpFile[] virtualFiles = new FtpFile[fileStats.length];
for (int i = 0; i < fileStats.length; i++) {
File fileObj = new File(fileStats[i].getPath().toString());
String fileName = virtualFileStr + fileObj.getName();
virtualFiles[i] = new HdfsFileObject(homePath.toString(), fileName, user);
}
return Collections.unmodifiableList(Arrays.asList(virtualFiles));
} catch (IOException e) {
log.debug("", e);
return null;
}
}
/**
* Creates output stream to write to the object
*
* @param l is not used here
* @return OutputStream
* @throws IOException
*/
public OutputStream createOutputStream(long l) {
try {
FileSystem fs = HdfsOverFtpSystem.getDfs();
FSDataOutputStream out = fs.create(fullPath);
// fs.setOwner(fullPath, user.getName(), user.getMainGroup());
return out;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* Creates input stream to read from the object
*
* @param l is not used here
* @return OutputStream
* @throws IOException
*/
public InputStream createInputStream(long l) {
try {
FileSystem dfs = HdfsOverFtpSystem.getDfs();
FSDataInputStream in = dfs.open(fullPath);
return in;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,14 @@
package org.apache.hadoop.seaweed.ftp.service.filesystem;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.User;
/**
* Impelented FileSystemManager to use HdfsFileSystemView
*/
public class HdfsFileSystemManager implements FileSystemFactory {
public FileSystemView createFileSystemView(User user) {
return new HdfsFileSystemView(user);
}
}

View File

@ -0,0 +1,104 @@
package org.apache.hadoop.seaweed.ftp.service.filesystem;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.apache.hadoop.fs.Path;
import java.io.File;
/**
* Implemented FileSystemView to use HdfsFileObject
*/
public class HdfsFileSystemView implements FileSystemView {
private String homePath;
private String currPath = File.separator;
private User user;
/**
* Constructor - set the user object.
*/
protected HdfsFileSystemView(User user) {
if (user == null) {
throw new IllegalArgumentException("user can not be null");
}
if (user.getHomeDirectory() == null) {
throw new IllegalArgumentException(
"User home directory can not be null");
}
this.homePath = user.getHomeDirectory();
this.user = user;
}
public FtpFile getHomeDirectory() {
return new HdfsFileObject(homePath, File.separator, user);
}
public FtpFile getWorkingDirectory() {
FtpFile fileObj;
if (currPath.equals(File.separator)) {
fileObj = new HdfsFileObject(homePath, File.separator, user);
} else {
fileObj = new HdfsFileObject(homePath, currPath, user);
}
return fileObj;
}
public boolean changeWorkingDirectory(String dir) {
Path path;
if (dir.startsWith(File.separator) || new Path(currPath).equals(new Path(dir))) {
path = new Path(dir);
} else if (currPath.length() > 1) {
path = new Path(currPath + File.separator + dir);
} else {
if(dir.startsWith("/")) {
path = new Path(dir);
}else {
path = new Path(File.separator + dir);
}
}
// 防止退回根目录
if (path.getName().equals("..")) {
path = new Path(File.separator);
}
HdfsFileObject file = new HdfsFileObject(homePath, path.toString(), user);
if (file.isDirectory()) {
currPath = path.toString();
return true;
} else {
return false;
}
}
public FtpFile getFile(String file) {
String path;
if (file.startsWith(File.separator)) {
path = file;
} else if (currPath.length() > 1) {
path = currPath + File.separator + file;
} else {
path = File.separator + file;
}
return new HdfsFileObject(homePath, path, user);
}
/**
* Is the file content random accessible?
*/
public boolean isRandomAccessible() {
return true;
}
/**
* Dispose file system view - does nothing.
*/
public void dispose() {
}
}

View File

@ -0,0 +1,72 @@
package org.apache.hadoop.seaweed.ftp.service.filesystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Class to store DFS connection
*/
public class HdfsOverFtpSystem {
private static FileSystem fs = null;
private static String hdfsUri;
private static boolean seaweedFsEnable;
private static String seaweedFsAccess;
private static String seaweedFsReplication;
private final static Logger log = LoggerFactory.getLogger(HdfsOverFtpSystem.class);
private static void hdfsInit() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsUri);
if(seaweedFsEnable) {
configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
configuration.set("fs.seaweed.volume.server.access", seaweedFsAccess);
configuration.set("fs.seaweed.replication", seaweedFsReplication);
}
fs = FileSystem.get(configuration);
log.info("HDFS load success");
}
/**
* Get dfs
*
* @return dfs
* @throws IOException
*/
public static FileSystem getDfs() throws IOException {
if (fs == null) {
hdfsInit();
}
return fs;
}
public static void setHdfsUri(String hdfsUri) {
HdfsOverFtpSystem.hdfsUri = hdfsUri;
}
public static String getHdfsUri() {
return hdfsUri;
}
public static void setSeaweedFsEnable(boolean seaweedFsEnable) {
HdfsOverFtpSystem.seaweedFsEnable = seaweedFsEnable;
}
public static void setSeaweedFsAccess(String seaweedFsAccess) {
HdfsOverFtpSystem.seaweedFsAccess = seaweedFsAccess;
}
public static void setSeaweedFsReplication(String seaweedFsReplication) {
HdfsOverFtpSystem.seaweedFsReplication = seaweedFsReplication;
}
}

View File

@ -0,0 +1,239 @@
package org.apache.hadoop.seaweed.ftp.users;
import org.apache.ftpserver.ftplet.Authority;
import org.apache.ftpserver.ftplet.AuthorizationRequest;
import org.apache.ftpserver.ftplet.User;
import org.apache.log4j.Logger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class HdfsUser implements User, Serializable {
private static final long serialVersionUID = -47371353779731294L;
private String name = null;
private String password = null;
private int maxIdleTimeSec = 0; // no limit
private String homeDir = null;
private boolean isEnabled = true;
private List<? extends Authority> authorities = new ArrayList<Authority>();
private ArrayList<String> groups = new ArrayList<String>();
private Logger log = Logger.getLogger(HdfsUser.class);
/**
* Default constructor.
*/
public HdfsUser() {
}
/**
* Copy constructor.
*/
public HdfsUser(User user) {
name = user.getName();
password = user.getPassword();
authorities = user.getAuthorities();
maxIdleTimeSec = user.getMaxIdleTime();
homeDir = user.getHomeDirectory();
isEnabled = user.getEnabled();
}
public ArrayList<String> getGroups() {
return groups;
}
/**
* Get the main group of the user
*
* @return main group of the user
*/
public String getMainGroup() {
if (groups.size() > 0) {
return groups.get(0);
} else {
log.error("User " + name + " is not a memer of any group");
return "error";
}
}
/**
* Checks if user is a member of the group
*
* @param group to check
* @return true if the user id a member of the group
*/
public boolean isGroupMember(String group) {
for (String userGroup : groups) {
if (userGroup.equals(group)) {
return true;
}
}
return false;
}
/**
* Set users' groups
*
* @param groups to set
*/
public void setGroups(ArrayList<String> groups) {
if (groups.size() < 1) {
log.error("User " + name + " is not a memer of any group");
}
this.groups = groups;
}
/**
* Get the user name.
*/
public String getName() {
return name;
}
/**
* Set user name.
*/
public void setName(String name) {
this.name = name;
}
/**
* Get the user password.
*/
public String getPassword() {
return password;
}
/**
* Set user password.
*/
public void setPassword(String pass) {
password = pass;
}
public List<Authority> getAuthorities() {
if (authorities != null) {
return Collections.unmodifiableList(authorities);
} else {
return null;
}
}
public void setAuthorities(List<Authority> authorities) {
if (authorities != null) {
this.authorities = Collections.unmodifiableList(authorities);
} else {
this.authorities = null;
}
}
/**
* Get the maximum idle time in second.
*/
public int getMaxIdleTime() {
return maxIdleTimeSec;
}
/**
* Set the maximum idle time in second.
*/
public void setMaxIdleTime(int idleSec) {
maxIdleTimeSec = idleSec;
if (maxIdleTimeSec < 0) {
maxIdleTimeSec = 0;
}
}
/**
* Get the user enable status.
*/
public boolean getEnabled() {
return isEnabled;
}
/**
* Set the user enable status.
*/
public void setEnabled(boolean enb) {
isEnabled = enb;
}
/**
* Get the user home directory.
*/
public String getHomeDirectory() {
return homeDir;
}
/**
* Set the user home directory.
*/
public void setHomeDirectory(String home) {
homeDir = home;
}
/**
* String representation.
*/
public String toString() {
return name;
}
/**
* {@inheritDoc}
*/
public AuthorizationRequest authorize(AuthorizationRequest request) {
List<Authority> authorities = getAuthorities();
// check for no authorities at all
if (authorities == null) {
return null;
}
boolean someoneCouldAuthorize = false;
for (Authority authority : authorities) {
if (authority.canAuthorize(request)) {
someoneCouldAuthorize = true;
request = authority.authorize(request);
// authorization failed, return null
if (request == null) {
return null;
}
}
}
if (someoneCouldAuthorize) {
return request;
} else {
return null;
}
}
/**
* {@inheritDoc}
*/
public List<Authority> getAuthorities(Class<? extends Authority> clazz) {
List<Authority> selected = new ArrayList<Authority>();
for (Authority authority : authorities) {
if (authority.getClass().equals(clazz)) {
selected.add(authority);
}
}
return selected;
}
}

View File

@ -0,0 +1,453 @@
package org.apache.hadoop.seaweed.ftp.users;
import org.apache.ftpserver.FtpServerConfigurationException;
import org.apache.ftpserver.ftplet.*;
import org.apache.ftpserver.usermanager.*;
import org.apache.ftpserver.usermanager.impl.*;
import org.apache.ftpserver.util.BaseProperties;
import org.apache.ftpserver.util.IoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
public class HdfsUserManager extends AbstractUserManager {
private final Logger LOG = LoggerFactory
.getLogger(HdfsUserManager.class);
private final static String DEPRECATED_PREFIX = "FtpServer.user.";
private final static String PREFIX = "ftpserver.user.";
private static BaseProperties userDataProp;
private File userDataFile = new File("users.conf");
private boolean isConfigured = false;
private PasswordEncryptor passwordEncryptor = new Md5PasswordEncryptor();
/**
* Retrieve the file used to load and store users
*
* @return The file
*/
public File getFile() {
return userDataFile;
}
/**
* Set the file used to store and read users. Must be set before
* {@link #configure()} is called.
*
* @param propFile A file containing users
*/
public void setFile(File propFile) {
if (isConfigured) {
throw new IllegalStateException("Must be called before configure()");
}
this.userDataFile = propFile;
}
/**
* Retrieve the password encryptor used for this user manager
*
* @return The password encryptor. Default to {@link Md5PasswordEncryptor}
* if no other has been provided
*/
public PasswordEncryptor getPasswordEncryptor() {
return passwordEncryptor;
}
/**
* Set the password encryptor to use for this user manager
*
* @param passwordEncryptor The password encryptor
*/
public void setPasswordEncryptor(PasswordEncryptor passwordEncryptor) {
this.passwordEncryptor = passwordEncryptor;
}
/**
* Lazy init the user manager
*/
private void lazyInit() {
if (!isConfigured) {
configure();
}
}
/**
* Configure user manager.
*/
public void configure() {
isConfigured = true;
try {
userDataProp = new BaseProperties();
if (userDataFile != null && userDataFile.exists()) {
FileInputStream fis = null;
try {
fis = new FileInputStream(userDataFile);
userDataProp.load(fis);
} finally {
IoUtils.close(fis);
}
}
} catch (IOException e) {
throw new FtpServerConfigurationException(
"Error loading user data file : "
+ userDataFile.getAbsolutePath(), e);
}
convertDeprecatedPropertyNames();
}
private void convertDeprecatedPropertyNames() {
Enumeration<?> keys = userDataProp.propertyNames();
boolean doSave = false;
while (keys.hasMoreElements()) {
String key = (String) keys.nextElement();
if (key.startsWith(DEPRECATED_PREFIX)) {
String newKey = PREFIX
+ key.substring(DEPRECATED_PREFIX.length());
userDataProp.setProperty(newKey, userDataProp.getProperty(key));
userDataProp.remove(key);
doSave = true;
}
}
if (doSave) {
try {
saveUserData();
} catch (FtpException e) {
throw new FtpServerConfigurationException(
"Failed to save updated user data", e);
}
}
}
public synchronized void save(User usr, boolean renamePush) throws FtpException {
lazyInit();
userDataProp.setProperty(PREFIX + usr.getName() + ".rename.push", renamePush);
save(usr);
}
/**
* Save user data. Store the properties.
*/
public synchronized void save(User usr) throws FtpException {
lazyInit();
// null value check
if (usr.getName() == null) {
throw new NullPointerException("User name is null.");
}
String thisPrefix = PREFIX + usr.getName() + '.';
// set other properties
userDataProp.setProperty(thisPrefix + ATTR_PASSWORD, getPassword(usr));
String home = usr.getHomeDirectory();
if (home == null) {
home = "/";
}
userDataProp.setProperty(thisPrefix + ATTR_HOME, home);
userDataProp.setProperty(thisPrefix + ATTR_ENABLE, usr.getEnabled());
userDataProp.setProperty(thisPrefix + ATTR_WRITE_PERM, usr
.authorize(new WriteRequest()) != null);
userDataProp.setProperty(thisPrefix + ATTR_MAX_IDLE_TIME, usr
.getMaxIdleTime());
TransferRateRequest transferRateRequest = new TransferRateRequest();
transferRateRequest = (TransferRateRequest) usr
.authorize(transferRateRequest);
if (transferRateRequest != null) {
userDataProp.setProperty(thisPrefix + ATTR_MAX_UPLOAD_RATE,
transferRateRequest.getMaxUploadRate());
userDataProp.setProperty(thisPrefix + ATTR_MAX_DOWNLOAD_RATE,
transferRateRequest.getMaxDownloadRate());
} else {
userDataProp.remove(thisPrefix + ATTR_MAX_UPLOAD_RATE);
userDataProp.remove(thisPrefix + ATTR_MAX_DOWNLOAD_RATE);
}
// request that always will succeed
ConcurrentLoginRequest concurrentLoginRequest = new ConcurrentLoginRequest(
0, 0);
concurrentLoginRequest = (ConcurrentLoginRequest) usr
.authorize(concurrentLoginRequest);
if (concurrentLoginRequest != null) {
userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_NUMBER,
concurrentLoginRequest.getMaxConcurrentLogins());
userDataProp.setProperty(thisPrefix + ATTR_MAX_LOGIN_PER_IP,
concurrentLoginRequest.getMaxConcurrentLoginsPerIP());
} else {
userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_NUMBER);
userDataProp.remove(thisPrefix + ATTR_MAX_LOGIN_PER_IP);
}
saveUserData();
}
/**
* @throws FtpException
*/
private void saveUserData() throws FtpException {
File dir = userDataFile.getAbsoluteFile().getParentFile();
if (dir != null && !dir.exists() && !dir.mkdirs()) {
String dirName = dir.getAbsolutePath();
throw new FtpServerConfigurationException(
"Cannot create directory for user data file : " + dirName);
}
// save user data
FileOutputStream fos = null;
try {
fos = new FileOutputStream(userDataFile);
userDataProp.store(fos, "Generated file - don't edit (please)");
} catch (IOException ex) {
LOG.error("Failed saving user data", ex);
throw new FtpException("Failed saving user data", ex);
} finally {
IoUtils.close(fos);
}
}
public synchronized void list() throws FtpException {
lazyInit();
Map dataMap = new HashMap();
Enumeration<String> propNames = (Enumeration<String>) userDataProp.propertyNames();
ArrayList<String> a = Collections.list(propNames);
a.remove("i18nMap");//去除i18nMap
for(String attrName : a){
// dataMap.put(attrName, propNames.);
}
}
/**
* Delete an user. Removes all this user entries from the properties. After
* removing the corresponding from the properties, save the data.
*/
public synchronized void delete(String usrName) throws FtpException {
lazyInit();
// remove entries from properties
String thisPrefix = PREFIX + usrName + '.';
Enumeration<?> propNames = userDataProp.propertyNames();
ArrayList<String> remKeys = new ArrayList<String>();
while (propNames.hasMoreElements()) {
String thisKey = propNames.nextElement().toString();
if (thisKey.startsWith(thisPrefix)) {
remKeys.add(thisKey);
}
}
Iterator<String> remKeysIt = remKeys.iterator();
while (remKeysIt.hasNext()) {
userDataProp.remove(remKeysIt.next());
}
saveUserData();
}
/**
* Get user password. Returns the encrypted value.
* <p/>
* <pre>
* If the password value is not null
* password = new password
* else
* if user does exist
* password = old password
* else
* password = &quot;&quot;
* </pre>
*/
private String getPassword(User usr) {
String name = usr.getName();
String password = usr.getPassword();
if (password != null) {
password = passwordEncryptor.encrypt(password);
} else {
String blankPassword = passwordEncryptor.encrypt("");
if (doesExist(name)) {
String key = PREFIX + name + '.' + ATTR_PASSWORD;
password = userDataProp.getProperty(key, blankPassword);
} else {
password = blankPassword;
}
}
return password;
}
/**
* Get all user names.
*/
public synchronized String[] getAllUserNames() {
lazyInit();
// get all user names
String suffix = '.' + ATTR_HOME;
ArrayList<String> ulst = new ArrayList<String>();
Enumeration<?> allKeys = userDataProp.propertyNames();
int prefixlen = PREFIX.length();
int suffixlen = suffix.length();
while (allKeys.hasMoreElements()) {
String key = (String) allKeys.nextElement();
if (key.endsWith(suffix)) {
String name = key.substring(prefixlen);
int endIndex = name.length() - suffixlen;
name = name.substring(0, endIndex);
ulst.add(name);
}
}
Collections.sort(ulst);
return ulst.toArray(new String[0]);
}
private ArrayList<String> parseGroups(String groupsLine) {
String groupsArray[] = groupsLine.split(",");
return new ArrayList(Arrays.asList(groupsArray));
}
public static synchronized boolean getUserRenamePush(String userName) {
return userDataProp.getBoolean(PREFIX + userName + ".rename.push", false);
}
/**
* Load user data.
*/
public synchronized User getUserByName(String userName) {
lazyInit();
if (!doesExist(userName)) {
return null;
}
String baseKey = PREFIX + userName + '.';
HdfsUser user = new HdfsUser();
user.setName(userName);
user.setEnabled(userDataProp.getBoolean(baseKey + ATTR_ENABLE, true));
user.setHomeDirectory(userDataProp
.getProperty(baseKey + ATTR_HOME, "/"));
// user.setGroups(parseGroups(userDataProp
// .getProperty(baseKey + "groups")));
List<Authority> authorities = new ArrayList<Authority>();
if (userDataProp.getBoolean(baseKey + ATTR_WRITE_PERM, false)) {
authorities.add(new WritePermission());
}
int maxLogin = userDataProp.getInteger(baseKey + ATTR_MAX_LOGIN_NUMBER,
0);
int maxLoginPerIP = userDataProp.getInteger(baseKey
+ ATTR_MAX_LOGIN_PER_IP, 0);
authorities.add(new ConcurrentLoginPermission(maxLogin, maxLoginPerIP));
int uploadRate = userDataProp.getInteger(
baseKey + ATTR_MAX_UPLOAD_RATE, 0);
int downloadRate = userDataProp.getInteger(baseKey
+ ATTR_MAX_DOWNLOAD_RATE, 0);
authorities.add(new TransferRatePermission(downloadRate, uploadRate));
user.setAuthorities(authorities);
user.setMaxIdleTime(userDataProp.getInteger(baseKey
+ ATTR_MAX_IDLE_TIME, 0));
return user;
}
/**
* User existance check
*/
public synchronized boolean doesExist(String name) {
lazyInit();
String key = PREFIX + name + '.' + ATTR_HOME;
return userDataProp.containsKey(key);
}
/**
* User authenticate method
*/
public synchronized User authenticate(Authentication authentication)
throws AuthenticationFailedException {
lazyInit();
if (authentication instanceof UsernamePasswordAuthentication) {
UsernamePasswordAuthentication upauth = (UsernamePasswordAuthentication) authentication;
String user = upauth.getUsername();
String password = upauth.getPassword();
if (user == null) {
throw new AuthenticationFailedException("Authentication failed");
}
if (password == null) {
password = "";
}
String storedPassword = userDataProp.getProperty(PREFIX + user + '.'
+ ATTR_PASSWORD);
if (storedPassword == null) {
// user does not exist
throw new AuthenticationFailedException("Authentication failed");
}
if (passwordEncryptor.matches(password, storedPassword)) {
return getUserByName(user);
} else {
throw new AuthenticationFailedException("Authentication failed");
}
} else if (authentication instanceof AnonymousAuthentication) {
if (doesExist("anonymous")) {
return getUserByName("anonymous");
} else {
throw new AuthenticationFailedException("Authentication failed");
}
} else {
throw new IllegalArgumentException(
"Authentication not supported by this user manager");
}
}
/**
* Close the user manager - remove existing entries.
*/
public synchronized void dispose() {
if (userDataProp != null) {
userDataProp.clear();
userDataProp = null;
}
}
}

View File

@ -0,0 +1,15 @@
server:
port: 8080
ftp:
port: 2222
passive-address: localhost
passive-ports: 30000-30999
hdfs:
uri: seaweedfs://localhost:8888
seaweedFs:
enable: true
access: direct # direct/filerProxy/publicUrl
replication: "000"

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>package</id>
<formats>
<!-- 指定打包格式支持的打包格式有zip、tar、tar.gz (or tgz)、tar.bz2 (or tbz2)、jar、dir、war可以同时指定多个打包格式 -->
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>application.yml</include>
<include>logback-spring.xml</include>
<include>users.properties</include>
<include>kafka-producer.properties</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.directory}</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*.jar</include>
</includes>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>lib</outputDirectory>
<scope>runtime</scope>
<unpack>false</unpack>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
<property name="LOG_HOME" value="${user.dir}/logs/" />
<!-- 控制台输出 -->
<appender name="Stdout" class="ch.qos.logback.core.ConsoleAppender">
<!-- 日志输出编码 -->
<layout class="ch.qos.logback.classic.PatternLayout">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</layout>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="RollingFile"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${LOG_HOME}/fileLog.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/fileLog.log.%d.%i</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100 MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder>
<pattern>
%d %p (%file:%line\)- %m%n
</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!-- 日志输出级别 -->
<root level="info">
<appender-ref ref="Stdout" />
<appender-ref ref="RollingFile" />
</root>
</configuration>

View File

@ -0,0 +1,12 @@
#Generated file - don't edit (please)
#Thu Mar 11 19:11:12 CST 2021
ftpserver.user.test.idletime=0
ftpserver.user.test.maxloginperip=0
ftpserver.user.test.userpassword=44664D4D827C740293D2AA244FB60445
ftpserver.user.test.enableflag=true
ftpserver.user.test.maxloginnumber=0
ftpserver.user.test.rename.push=true
ftpserver.user.test.homedirectory=/buckets/test/
ftpserver.user.test.downloadrate=0
ftpserver.user.test.writepermission=true
ftpserver.user.test.uploadrate=0