seaweedfs/weed/mq/schema/to_parquet_value.go

93 lines
3.4 KiB
Go
Raw Normal View History

2024-04-18 14:49:21 +08:00
package schema
import (
"fmt"
parquet "github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
2024-04-26 00:14:37 +08:00
func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
2024-04-22 15:42:18 +08:00
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
2024-04-25 14:04:47 +08:00
var parquetValue parquet.Value
parquetValue, err = toParquetValue(fieldValue)
2024-04-22 15:42:18 +08:00
if err != nil {
2024-04-25 14:04:47 +08:00
return
2024-04-22 15:42:18 +08:00
}
2024-04-26 00:14:37 +08:00
rowBuilder.Add(levels.startColumnIndex, parquetValue)
2024-04-25 14:04:47 +08:00
// fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
2024-04-22 15:42:18 +08:00
case *schema_pb.Type_ListType:
2024-04-26 00:14:37 +08:00
rowBuilder.Next(levels.startColumnIndex)
2024-04-25 14:04:47 +08:00
// fmt.Printf("rowBuilder.Next %d\n", columnIndex)
2024-04-22 15:42:18 +08:00
elementType := fieldType.GetListType().ElementType
for _, value := range fieldValue.GetListValue().Values {
2024-04-26 00:14:37 +08:00
if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
2024-04-25 14:04:47 +08:00
return
2024-04-18 14:49:21 +08:00
}
}
}
2024-04-25 14:04:47 +08:00
return
2024-04-22 15:42:18 +08:00
}
2024-04-26 14:59:30 +08:00
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
2024-04-26 00:14:37 +08:00
visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
2024-04-22 15:42:18 +08:00
}
fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
2024-04-26 00:14:37 +08:00
return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
2024-04-18 14:49:21 +08:00
}
// typeValueVisitor is a function that is called for each value in a schema_pb.Value
// Find the column index.
// intended to be used in RowBuilder.Add(columnIndex, value)
2024-04-26 00:14:37 +08:00
type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
2024-04-18 14:49:21 +08:00
// endIndex is exclusive
// same logic as RowBuilder.configure in row_builder.go
2024-04-26 00:14:37 +08:00
func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
2024-04-18 14:49:21 +08:00
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
2024-04-26 00:14:37 +08:00
return visitor(fieldType, levels, fieldValue)
2024-04-18 14:49:21 +08:00
case *schema_pb.Type_ListType:
2024-04-26 00:14:37 +08:00
return visitor(fieldType, levels, fieldValue)
2024-04-18 14:49:21 +08:00
case *schema_pb.Type_RecordType:
for _, field := range fieldType.GetRecordType().Fields {
fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
if !found {
// TODO check this if no such field found
2024-04-26 00:14:37 +08:00
continue
2024-04-18 14:49:21 +08:00
}
2024-04-26 00:14:37 +08:00
fieldLevels := levels.levels[field.Name]
err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor)
2024-04-18 14:49:21 +08:00
if err != nil {
return
}
}
return
}
return
}
func toParquetValue(value *schema_pb.Value) (parquet.Value, error) {
switch value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return parquet.BooleanValue(value.GetBoolValue()), nil
case *schema_pb.Value_Int32Value:
return parquet.Int32Value(value.GetInt32Value()), nil
case *schema_pb.Value_Int64Value:
return parquet.Int64Value(value.GetInt64Value()), nil
case *schema_pb.Value_FloatValue:
return parquet.FloatValue(value.GetFloatValue()), nil
case *schema_pb.Value_DoubleValue:
return parquet.DoubleValue(value.GetDoubleValue()), nil
case *schema_pb.Value_BytesValue:
return parquet.ByteArrayValue(value.GetBytesValue()), nil
case *schema_pb.Value_StringValue:
return parquet.ByteArrayValue([]byte(value.GetStringValue())), nil
default:
return parquet.NullValue(), fmt.Errorf("unknown value type: %T", value.Kind)
}
}