Add HTTP server support using Crow; configure CMake for dependencies and implement basic CRUD operations

This commit is contained in:
harish876 2025-04-11 06:18:10 +00:00
parent 46b995573b
commit 2262f3ca43
7 changed files with 487 additions and 42 deletions

3
.gitmodules vendored
View File

@ -7,3 +7,6 @@
[submodule "third_party/rapidjson"]
path = third_party/rapidjson
url = https://github.com/Tencent/rapidjson.git
[submodule "third_party/crow"]
path = third_party/crow
url = https://github.com/CrowCpp/Crow.git

View File

@ -36,6 +36,42 @@ endif(WIN32)
option(LEVELDB_BUILD_TESTS "Build LevelDB's unit tests" ON)
option(LEVELDB_BUILD_BENCHMARKS "Build LevelDB's benchmarks" ON)
option(LEVELDB_INSTALL "Install LevelDB's header and library" ON)
option(LEVELDB_HTTP "Build HTTP Server" ON)
# Add this section for HTTP server with Crow
if(LEVELDB_HTTP)
# Find required dependencies for Crow
find_package(Threads REQUIRED)
# Check for Boost - optional but recommended for Crow
find_package(Boost COMPONENTS system QUIET)
if(Boost_FOUND)
set(CROW_USE_BOOST ON)
message(STATUS "Boost found. Enabling Boost support for Crow.")
else()
set(CROW_USE_BOOST OFF)
message(STATUS "Boost not found. Building Crow without Boost support.")
endif()
# OpenSSL for HTTPS support (optional)
find_package(OpenSSL QUIET)
if(OpenSSL_FOUND)
set(CROW_ENABLE_SSL ON)
message(STATUS "OpenSSL found. Enabling HTTPS support.")
else()
set(CROW_ENABLE_SSL OFF)
message(STATUS "OpenSSL not found. HTTPS support disabled.")
endif()
# Set Crow options as needed
set(CROW_BUILD_EXAMPLES OFF CACHE BOOL "Build Crow examples" FORCE)
set(CROW_BUILD_TESTS OFF CACHE BOOL "Build Crow tests" FORCE)
# Add the http_server directory to the build
add_subdirectory(http_server)
endif(LEVELDB_HTTP)
include(CheckIncludeFile)
check_include_file("unistd.h" HAVE_UNISTD_H)

View File

@ -1,34 +1,56 @@
#include <cassert>
#include <chrono>
#include <iostream>
#include <leveldb/db.h>
#include <leveldb/filter_policy.h>
#include <rapidjson/document.h>
#include <sstream> // for stringstream
#include <sstream>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include "leveldb/options.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
using namespace std;
using namespace std::chrono;
int main() {
leveldb::DB* db;
leveldb::Options options;
options.filter_policy = leveldb::NewBloomFilterPolicy(10);
options.primary_key = "id";
options.secondary_key = "age";
options.create_if_missing = true;
leveldb::Status status =
leveldb::DB::Open(options, "/opt/leveldbplus/test_level_db_idx", &db);
assert(status.ok());
leveldb::ReadOptions roptions;
leveldb::WriteOptions woptions;
void printUsage() {
std::cout
<< "Usage: db_index [OPTIONS]\n"
<< "Options:\n"
<< " --insert Run only data insertion phase\n"
<< " --query Run only query phase (must have existing "
"data)\n"
<< " --run-all Run both insertion and query phases "
"(default)\n"
<< " --use-index Run only secondary index benchmark\n"
<< " --no-index Run only full scan benchmark\n"
<< " --records N Number of records to insert (default: 10000)\n"
<< " --target-age N Age value to search for (default: 30)\n"
<< " --db-path PATH Database path (default: "
"/opt/leveldbplus/test_level_db_idx)\n"
<< " --help Print this help message\n";
}
std::cout << "Starting Index test..." << std::endl;
// Check if directory exists
bool directoryExists(const std::string& path) {
struct stat info;
return stat(path.c_str(), &info) == 0 && (info.st_mode & S_IFDIR);
}
// Add 10,000 key-value pairs
for (int i = 0; i < 10000; ++i) {
// Insert data into the database
void insertData(leveldb::DB* db, int numRecords,
const leveldb::WriteOptions& woptions) {
std::cout << "==========================================\n";
std::cout << "INSERTING DATA\n";
std::cout << "==========================================\n";
std::cout << "Inserting " << numRecords << " records...\n";
auto startInsert = high_resolution_clock::now();
for (int i = 0; i < numRecords; ++i) {
std::stringstream ss;
ss << "{\n \"id\": " << i << ",\n \"age\": " << (i % 50 + 10)
<< ",\n \"name\": \"User" << i << "\"\n}";
@ -40,29 +62,50 @@ int main() {
}
}
/*
Using Secondary Index
*/
auto endInsert = high_resolution_clock::now();
auto insertDuration = duration_cast<milliseconds>(endInsert - startInsert);
std::cout << "Insertion took " << insertDuration.count() << " ms\n\n";
}
// Run queries with secondary index
void queryWithIndex(leveldb::DB* db, int targetAge, int numRecords,
const leveldb::ReadOptions& roptions) {
std::cout << "==========================================\n";
std::cout << "USING SECONDARY INDEX\n";
std::cout << "==========================================\n";
auto startWithIndex = high_resolution_clock::now();
vector<leveldb::SKeyReturnVal> values;
leveldb::Status s =
db->Get(roptions, leveldb::Slice(std::to_string(30)), &values, 10000);
leveldb::Status s = db->Get(
roptions, leveldb::Slice(std::to_string(targetAge)), &values, numRecords);
auto endWithIndex = high_resolution_clock::now();
auto withIndexDuration =
duration_cast<microseconds>(endWithIndex - startWithIndex);
if (!s.ok()) {
std::cout << "Error calling new get method " << s.ToString() << std::endl;
std::cout << "Error calling new get method: " << s.ToString() << std::endl;
}
std::cout << "Found " << values.size()
<< " records with age 30 using secondary index" << std::endl;
std::cout << "------------------------------------------------\n";
/*
How this would without secondary index
*/
rapidjson::Document doc;
std::cout << "Found " << values.size() << " records with age " << targetAge
<< " using secondary index\n";
std::cout << "Query took " << withIndexDuration.count()
<< " microseconds\n\n";
}
// Run queries without secondary index (full scan)
void queryWithoutIndex(leveldb::DB* db, int targetAge,
const leveldb::ReadOptions& roptions) {
std::cout << "==========================================\n";
std::cout << "WITHOUT SECONDARY INDEX (FULL SCAN)\n";
std::cout << "==========================================\n";
auto startWithoutIndex = high_resolution_clock::now();
leveldb::Iterator* it = db->NewIterator(roptions);
int count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
leveldb::Slice key = it->key();
leveldb::Slice value = it->value();
std::string json_string = value.ToString();
@ -77,17 +120,161 @@ int main() {
if (doc.HasMember("age") && doc["age"].IsInt()) {
int age = doc["age"].GetInt();
if (age == 30) {
// std::cout << "Key: " << key.ToString() << ", Value: " << json_string
// << std::endl;
if (age == targetAge) {
count++;
}
}
}
auto endWithoutIndex = high_resolution_clock::now();
auto withoutIndexDuration =
duration_cast<microseconds>(endWithoutIndex - startWithoutIndex);
assert(it->status().ok()); // Check for any errors found during the scan
std::cout << "Found " << count
<< " records with age 30 without using secondary index"
<< std::endl;
std::cout << "Found " << count << " records with age " << targetAge
<< " without using secondary index\n";
std::cout << "Query took " << withoutIndexDuration.count()
<< " microseconds\n\n";
delete it;
}
// Run performance comparison
void runComparison(leveldb::DB* db, int targetAge, int numRecords,
const leveldb::ReadOptions& roptions) {
std::cout << "==========================================\n";
std::cout << "PERFORMANCE COMPARISON\n";
std::cout << "==========================================\n";
vector<leveldb::SKeyReturnVal> values;
auto startWithIndex = high_resolution_clock::now();
db->Get(roptions, leveldb::Slice(std::to_string(targetAge)), &values,
numRecords);
auto endWithIndex = high_resolution_clock::now();
auto withIndexDuration =
duration_cast<microseconds>(endWithIndex - startWithIndex);
leveldb::Iterator* it = db->NewIterator(roptions);
int count = 0;
auto startWithoutIndex = high_resolution_clock::now();
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rapidjson::Document doc;
doc.Parse<0>(it->value().ToString().c_str());
if (!doc.HasParseError() && doc.HasMember("age") && doc["age"].IsInt()) {
if (doc["age"].GetInt() == targetAge) {
count++;
}
}
}
auto endWithoutIndex = high_resolution_clock::now();
auto withoutIndexDuration =
duration_cast<microseconds>(endWithoutIndex - startWithoutIndex);
delete it;
std::cout << "With Index: " << withIndexDuration.count() << " microseconds\n";
std::cout << "Without Index: " << withoutIndexDuration.count()
<< " microseconds\n";
double speedup =
(double)withoutIndexDuration.count() / withIndexDuration.count();
std::cout << "Speedup: " << speedup << "x\n";
}
int main(int argc, char* argv[]) {
// Default settings
bool runInsert = true;
bool runQuery = true;
bool runWithIndex = true;
bool runWithoutIndex = true;
int numRecords = 10000;
int targetAge = 30;
string dbPath = "/opt/leveldbplus/test_level_db_idx";
// Parse command line arguments
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--insert") == 0) {
runInsert = true;
runQuery = false;
} else if (strcmp(argv[i], "--query") == 0) {
runInsert = false;
runQuery = true;
} else if (strcmp(argv[i], "--run-all") == 0) {
runInsert = true;
runQuery = true;
} else if (strcmp(argv[i], "--use-index") == 0) {
runWithIndex = true;
runWithoutIndex = false;
} else if (strcmp(argv[i], "--no-index") == 0) {
runWithIndex = false;
runWithoutIndex = true;
} else if (strcmp(argv[i], "--records") == 0 && i + 1 < argc) {
numRecords = atoi(argv[++i]);
} else if (strcmp(argv[i], "--target-age") == 0 && i + 1 < argc) {
targetAge = atoi(argv[++i]);
} else if (strcmp(argv[i], "--db-path") == 0 && i + 1 < argc) {
dbPath = argv[++i];
} else if (strcmp(argv[i], "--help") == 0) {
printUsage();
return 0;
} else {
std::cerr << "Unknown option: " << argv[i] << std::endl;
printUsage();
return 1;
}
}
// Check if we're just querying but the database doesn't exist
if (!runInsert && runQuery && !directoryExists(dbPath)) {
std::cerr << "Error: Cannot run query phase without existing database at "
<< dbPath << std::endl;
std::cerr << "Run with --insert first or provide valid --db-path"
<< std::endl;
return 1;
}
// Setup database
leveldb::DB* db;
leveldb::Options options;
options.filter_policy = leveldb::NewBloomFilterPolicy(10);
options.primary_key = "id";
options.secondary_key = "age";
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, dbPath, &db);
if (!status.ok()) {
std::cerr << "Error opening database: " << status.ToString() << std::endl;
return 1;
}
leveldb::ReadOptions roptions;
leveldb::WriteOptions woptions;
std::cout << "==========================================\n";
std::cout << "LevelDB Secondary Index Benchmark\n";
std::cout << "==========================================\n";
std::cout << "Records: " << numRecords << "\n";
std::cout << "Target Age: " << targetAge << "\n";
std::cout << "DB Path: " << dbPath << "\n";
std::cout << "==========================================\n\n";
// Insertion phase
if (runInsert) {
insertData(db, numRecords, woptions);
}
// Query phase
if (runQuery) {
if (runWithIndex) {
queryWithIndex(db, targetAge, numRecords, roptions);
}
if (runWithoutIndex) {
queryWithoutIndex(db, targetAge, roptions);
}
if (runWithIndex && runWithoutIndex) {
runComparison(db, targetAge, numRecords, roptions);
}
}
delete db;
delete options.filter_policy;

View File

@ -1210,7 +1210,6 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key,
&result_set, top_k_outputs);
if (imm != nullptr && top_k_outputs - acc->size() > 0) {
int mem_size = acc->size();
imm->Get(s_key, snapshot, acc, &s, this->options_.secondary_key,
&result_set, top_k_outputs);
}
@ -1218,15 +1217,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& s_key,
if (top_k_outputs > (int)(acc->size())) {
s = current->Get(options, lkey, acc, &stats, this->options_.secondary_key,
top_k_outputs, &result_set, this);
// have_stat_update = true;
have_stat_update = true;
}
std::sort_heap(acc->begin(), acc->end(), NewestFirst);
// std::sort_heap(acc->begin(), acc->end(), NewestFirst);
mutex_.Lock();
}
// /*if (have_stat_update && current->UpdateStats(stats)) {
// MaybeScheduleCompaction();
// }*/
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();

View File

@ -0,0 +1,56 @@
# HTTP Server for LevelDB
# Add Crow as a header-only library
add_library(crow INTERFACE)
target_include_directories(crow INTERFACE
${CMAKE_SOURCE_DIR}/third_party/crow/include
)
# Find RapidJSON
find_package(RapidJSON QUIET)
if(NOT RapidJSON_FOUND)
# If not found through find_package, use bundled version or check common paths
if(EXISTS "${CMAKE_SOURCE_DIR}/third_party/rapidjson/include")
set(RAPIDJSON_INCLUDE_DIR "${CMAKE_SOURCE_DIR}/third_party/rapidjson/include")
else()
# Check common system paths
find_path(RAPIDJSON_INCLUDE_DIR rapidjson/document.h
PATHS
/usr/include
/usr/local/include
/opt/local/include
)
if(NOT RAPIDJSON_INCLUDE_DIR)
message(FATAL_ERROR "RapidJSON not found. Please install it or update CMAKE_PREFIX_PATH.")
endif()
endif()
endif()
# Build the HTTP server executable
add_executable(leveldb_http_server http_server.cc)
# Include RapidJSON headers
target_include_directories(leveldb_http_server PRIVATE
${RAPIDJSON_INCLUDE_DIR}
)
target_link_libraries(leveldb_http_server PRIVATE
leveldb
crow
Threads::Threads
)
if(CROW_USE_BOOST)
target_link_libraries(leveldb_http_server PRIVATE ${Boost_LIBRARIES})
target_compile_definitions(leveldb_http_server PRIVATE CROW_USE_BOOST)
endif()
if(CROW_ENABLE_SSL)
target_link_libraries(leveldb_http_server PRIVATE OpenSSL::SSL OpenSSL::Crypto)
target_compile_definitions(leveldb_http_server PRIVATE CROW_ENABLE_SSL)
endif()
# Set compiler options
target_compile_features(leveldb_http_server PRIVATE cxx_std_11)

163
http_server/http_server.cc Normal file
View File

@ -0,0 +1,163 @@
#include <crow.h>
#include <iostream>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "leveldb/status.h"
#include "rapidjson/document.h"
int main(int argc, char* argv[]) {
// Process command line arguments
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <database_path> [port]" << std::endl;
return 1;
}
std::string db_path = argv[1];
int port = (argc > 2) ? std::stoi(argv[2]) : 8080;
// Open the database
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.primary_key = "id";
options.secondary_key = "age";
leveldb::Status status = leveldb::DB::Open(options, db_path, &db);
if (!status.ok()) {
std::cerr << "Unable to open/create database: " << status.ToString()
<< std::endl;
return 1;
}
crow::SimpleApp app;
CROW_ROUTE(app, "/db/get/<string>")
.methods("GET"_method)([db](const crow::request& req,
std::string primary_key) {
// Get value by primary key
std::string value;
leveldb::Status s =
db->Get(leveldb::ReadOptions(), primary_key, &value);
if (s.ok()) {
// Return the JSON value
return crow::response(200, value);
} else if (s.IsNotFound()) {
return crow::response(404, "{\"error\": \"Key not found\"}");
} else {
return crow::response(500, "{\"error\": \"" + s.ToString() + "\"}");
}
});
// Define route for secondary key operations
CROW_ROUTE(app, "/db/query")
.methods("GET"_method)([db](const crow::request& req) {
// Get query parameters
auto secondary_key = req.url_params.get("key");
auto limit_param = req.url_params.get("limit");
auto use_secondary = req.url_params.get("use_secondary");
int limit = 10;
if (limit_param) {
try {
limit = std::stoi(limit_param);
} catch (...) {
return crow::response(400,
"{\"error\": \"Invalid 'limit' parameter\"}");
}
}
if (!use_secondary) {
std::unique_ptr<leveldb::Iterator> it(
db->NewIterator(leveldb::ReadOptions()));
std::vector<std::string> results;
int target_key_value = std::stoi(secondary_key);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string value = it->value().ToString();
std::string sec_key_str;
rapidjson::Document doc;
if (doc.Parse<0>(value.c_str()).HasParseError() ||
!doc.HasMember("age")) {
continue;
}
if (doc.HasMember("age") && doc["age"].IsInt()) {
int age = doc["age"].GetInt();
if (age == target_key_value) {
results.push_back(value);
}
}
}
if (!it->status().ok()) {
return crow::response(500, "{\"error\": \"Iterator error: " +
it->status().ToString() + "\"}");
}
std::string json_results = "[";
for (size_t i = 0; i < results.size() && i < limit; i++) {
if (i > 0) json_results += ",";
json_results += results[i];
}
json_results += "]";
if (results.empty()) {
return crow::response(200, "application/json", "[]");
} else {
return crow::response(200, "application/json", json_results);
}
}
if (!secondary_key) {
return crow::response(400,
"{\"error\": \"Missing 'key' parameter\"}");
}
std::vector<leveldb::SKeyReturnVal> results;
leveldb::Status s =
db->Get(leveldb::ReadOptions(), leveldb::Slice(secondary_key),
&results, limit);
if (s.ok()) {
std::string json_results = "[";
for (size_t i = 0; i < results.size(); i++) {
if (i > 0) json_results += ",";
json_results += results[i].value;
}
json_results += "]";
return crow::response(200, "application/json", json_results);
} else if (s.IsNotFound()) {
return crow::response(404, "{\"error\": \"No records found\"}");
} else {
return crow::response(500, "{\"error\": \"" + s.ToString() + "\"}");
}
});
CROW_ROUTE(app, "/db/put")
.methods("POST"_method)([db](const crow::request& req) {
auto body = req.body;
if (body.empty()) {
return crow::response(400, "{\"error\": \"Empty request body\"}");
}
leveldb::Status s = db->Put(leveldb::WriteOptions(), body);
if (s.ok()) {
return crow::response(200, "{\"status\": \"success\"}");
} else {
return crow::response(500, "{\"error\": \"" + s.ToString() + "\"}");
}
});
// Start the server
std::cout << "Starting LevelDB HTTP server on port " << port << std::endl;
std::cout << "Database path: " << db_path << std::endl;
app.port(port).multithreaded().run();
delete db;
return 0;
}

1
third_party/crow vendored Submodule

@ -0,0 +1 @@
Subproject commit 1c5a0349658159f9c175f84ada7013034778a115