Merge branch 'master' into mq2

This commit is contained in:
chrislu 2024-09-29 20:47:16 -07:00
commit 190b2fc276
97 changed files with 2181 additions and 1635 deletions

40
go.mod
View File

@ -4,7 +4,7 @@ go 1.22.0
require (
cloud.google.com/go v0.115.1 // indirect
cloud.google.com/go/pubsub v1.42.0
cloud.google.com/go/pubsub v1.43.0
cloud.google.com/go/storage v1.43.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.15.0
@ -53,7 +53,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/karlseguin/ccache/v2 v2.0.8
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/reedsolomon v1.12.3
github.com/klauspost/reedsolomon v1.12.4
github.com/kurin/blazer v0.5.3
github.com/lib/pq v1.10.9
github.com/linxGnu/grocksdb v1.9.3
@ -106,13 +106,13 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/image v0.18.0
golang.org/x/net v0.29.0
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.25.0
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.24.0
golang.org/x/tools v0.25.0
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect
google.golang.org/api v0.195.0
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/api v0.198.0
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.2
gopkg.in/inf.v0 v0.9.1 // indirect
@ -128,7 +128,7 @@ require (
github.com/Jille/raft-grpc-transport v1.6.1
github.com/arangodb/go-driver v1.6.2
github.com/armon/go-metrics v0.4.1
github.com/aws/aws-sdk-go-v2 v1.30.5
github.com/aws/aws-sdk-go-v2 v1.31.0
github.com/aws/aws-sdk-go-v2/config v1.27.33
github.com/aws/aws-sdk-go-v2/credentials v1.17.32
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2
@ -158,10 +158,10 @@ require (
)
require (
cloud.google.com/go/auth v0.9.1 // indirect
cloud.google.com/go/auth v0.9.4 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.13 // indirect
cloud.google.com/go/compute/metadata v0.5.1 // indirect
cloud.google.com/go/iam v1.2.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 // indirect
@ -201,7 +201,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.22.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/bradenaw/juniper v0.15.2 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
@ -236,7 +236,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
@ -327,18 +327,18 @@ require (
github.com/zeebo/errs v1.3.0 // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.etcd.io/etcd/api/v3 v3.5.16 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/validator.v2 v2.0.1 // indirect

96
go.sum
View File

@ -84,8 +84,8 @@ cloud.google.com/go/assuredworkloads v1.7.0/go.mod h1:z/736/oNmtGAyU47reJgGN+KVo
cloud.google.com/go/assuredworkloads v1.8.0/go.mod h1:AsX2cqyNCOvEQC8RMPnoc0yEarXQk6WEKkxYfL6kGIo=
cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0=
cloud.google.com/go/assuredworkloads v1.10.0/go.mod h1:kwdUQuXcedVdsIaKgKTp9t0UJkE5+PAVNhdQm4ZVq2E=
cloud.google.com/go/auth v0.9.1 h1:+pMtLEV2k0AXKvs/tGZojuj6QaioxfUjOpMsG5Gtx+w=
cloud.google.com/go/auth v0.9.1/go.mod h1:Sw8ocT5mhhXxFklyhT12Eiy0ed6tTrPMCJjSI8KhYLk=
cloud.google.com/go/auth v0.9.4 h1:DxF7imbEbiFu9+zdKC6cKBko1e8XeJnipNqIbWZ+kDI=
cloud.google.com/go/auth v0.9.4/go.mod h1:SHia8n6//Ya940F1rLimhJCjjx7KE17t0ctFEci3HkA=
cloud.google.com/go/auth/oauth2adapt v0.2.4 h1:0GWE/FUsXhf6C+jAkWgYm7X9tK8cuEIfy19DBn6B6bY=
cloud.google.com/go/auth/oauth2adapt v0.2.4/go.mod h1:jC/jOpwFP6JBxhB3P5Rr0a9HLMC/Pe3eaL4NmdvqPtc=
cloud.google.com/go/automl v1.5.0/go.mod h1:34EjfoFGMZ5sgJ9EoLsRtdPSNZLcfflJR39VbVNS2M0=
@ -156,8 +156,8 @@ cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZ
cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY=
cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY=
cloud.google.com/go/compute/metadata v0.5.1 h1:NM6oZeZNlYjiwYje+sYFjEpP0Q0zCan1bmQW/KmIrGs=
cloud.google.com/go/compute/metadata v0.5.1/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k=
cloud.google.com/go/contactcenterinsights v1.3.0/go.mod h1:Eu2oemoePuEFc/xKFPjbTuPSj0fYJcPls9TFlPNnHHY=
cloud.google.com/go/contactcenterinsights v1.4.0/go.mod h1:L2YzkGbPsv+vMQMCADxJoT9YiTTnSEd6fEvCeHTYVck=
cloud.google.com/go/contactcenterinsights v1.6.0/go.mod h1:IIDlT6CLcDoyv79kDv8iWxMSTZhLxSCofVV5W6YFM/w=
@ -273,8 +273,8 @@ cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQE
cloud.google.com/go/iam v0.8.0/go.mod h1:lga0/y3iH6CX7sYqypWJ33hf7kkfXJag67naqGESjkE=
cloud.google.com/go/iam v0.11.0/go.mod h1:9PiLDanza5D+oWFZiH1uG+RnRCfEGKoyl6yo4cgWZGY=
cloud.google.com/go/iam v0.12.0/go.mod h1:knyHGviacl11zrtZUoDuYpDgLjvr28sLQaG0YB2GYAY=
cloud.google.com/go/iam v1.1.13 h1:7zWBXG9ERbMLrzQBRhFliAV+kjcRToDTgQT3CTwYyv4=
cloud.google.com/go/iam v1.1.13/go.mod h1:K8mY0uSXwEXS30KrnVb+j54LB/ntfZu1dr+4zFMNbus=
cloud.google.com/go/iam v1.2.0 h1:kZKMKVNk/IsSSc/udOb83K0hL/Yh/Gcqpz+oAkoIFN8=
cloud.google.com/go/iam v1.2.0/go.mod h1:zITGuWgsLZxd8OwAlX+eMFgZDXzBm7icj1PVTYG766Q=
cloud.google.com/go/iap v1.4.0/go.mod h1:RGFwRJdihTINIe4wZ2iCP0zF/qu18ZwyKxrhMhygBEc=
cloud.google.com/go/iap v1.5.0/go.mod h1:UH/CGgKd4KyohZL5Pt0jSKE4m3FR51qg6FKQ/z/Ix9A=
cloud.google.com/go/iap v1.6.0/go.mod h1:NSuvI9C/j7UdjGjIde7t7HBz+QTwBcapPE07+sSRcLk=
@ -288,8 +288,8 @@ cloud.google.com/go/kms v1.4.0/go.mod h1:fajBHndQ+6ubNw6Ss2sSd+SWvjL26RNo/dr7uxs
cloud.google.com/go/kms v1.5.0/go.mod h1:QJS2YY0eJGBg3mnDfuaCyLauWwBJiHRboYxJ++1xJNg=
cloud.google.com/go/kms v1.6.0/go.mod h1:Jjy850yySiasBUDi6KFUwUv2n1+o7QZFyuUJg6OgjA0=
cloud.google.com/go/kms v1.9.0/go.mod h1:qb1tPTgfF9RQP8e1wq4cLFErVuTJv7UsSC915J8dh3w=
cloud.google.com/go/kms v1.18.5 h1:75LSlVs60hyHK3ubs2OHd4sE63OAMcM2BdSJc2bkuM4=
cloud.google.com/go/kms v1.18.5/go.mod h1:yXunGUGzabH8rjUPImp2ndHiGolHeWJJ0LODLedicIY=
cloud.google.com/go/kms v1.19.0 h1:x0OVJDl6UH1BSX4THKlMfdcFWoE4ruh90ZHuilZekrU=
cloud.google.com/go/kms v1.19.0/go.mod h1:e4imokuPJUc17Trz2s6lEXFDt8bgDmvpVynH39bdrHM=
cloud.google.com/go/language v1.4.0/go.mod h1:F9dRpNFQmJbkaop6g0JhSBXCNlO90e1KWx5iDdxbWic=
cloud.google.com/go/language v1.6.0/go.mod h1:6dJ8t3B+lUYfStgls25GusK04NLh3eDLQnWM3mdEbhI=
cloud.google.com/go/language v1.7.0/go.mod h1:DJ6dYN/W+SQOjF8e1hLQXMF21AkH2w9wiPzPCJa2MIE=
@ -303,8 +303,8 @@ cloud.google.com/go/logging v1.7.0/go.mod h1:3xjP2CjkM3ZkO73aj4ASA5wRPGGCRrPIAeN
cloud.google.com/go/longrunning v0.1.1/go.mod h1:UUFxuDWkv22EuY93jjmDMFT5GPQKeFVJBIF6QlTqdsE=
cloud.google.com/go/longrunning v0.3.0/go.mod h1:qth9Y41RRSUE69rDcOn6DdK3HfQfsUI0YSmW3iIlLJc=
cloud.google.com/go/longrunning v0.4.1/go.mod h1:4iWDqhBZ70CvZ6BfETbvam3T8FMvLK+eFj0E6AaRQTo=
cloud.google.com/go/longrunning v0.5.12 h1:5LqSIdERr71CqfUsFlJdBpOkBH8FBCFD7P1nTWy3TYE=
cloud.google.com/go/longrunning v0.5.12/go.mod h1:S5hMV8CDJ6r50t2ubVJSKQVv5u0rmik5//KgLO3k4lU=
cloud.google.com/go/longrunning v0.6.0 h1:mM1ZmaNsQsnb+5n1DNPeL0KwQd9jQRqSqSDEkBZr+aI=
cloud.google.com/go/longrunning v0.6.0/go.mod h1:uHzSZqW89h7/pasCWNYdUpwGz3PcVWhrWupreVPYLts=
cloud.google.com/go/managedidentities v1.3.0/go.mod h1:UzlW3cBOiPrzucO5qWkNkh0w33KFtBJU281hacNvsdE=
cloud.google.com/go/managedidentities v1.4.0/go.mod h1:NWSBYbEMgqmbZsLIyKvxrYbtqOsxY1ZrGM+9RgDqInM=
cloud.google.com/go/managedidentities v1.5.0/go.mod h1:+dWcZ0JlUmpuxpIDfyP5pP5y0bLdRwOS4Lp7gMni/LA=
@ -377,8 +377,8 @@ cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjp
cloud.google.com/go/pubsub v1.26.0/go.mod h1:QgBH3U/jdJy/ftjPhTkyXNj543Tin1pRYcdcPRnFIRI=
cloud.google.com/go/pubsub v1.27.1/go.mod h1:hQN39ymbV9geqBnfQq6Xf63yNhUAhv9CZhzp5O6qsW0=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
cloud.google.com/go/pubsub v1.42.0 h1:PVTbzorLryFL5ue8esTS2BfehUs0ahyNOY9qcd+HMOs=
cloud.google.com/go/pubsub v1.42.0/go.mod h1:KADJ6s4MbTwhXmse/50SebEhE4SmUwHi48z3/dHar1Y=
cloud.google.com/go/pubsub v1.43.0 h1:s3Qx+F96J7Kwey/uVHdK3QxFLIlOvvw4SfMYw2jFjb4=
cloud.google.com/go/pubsub v1.43.0/go.mod h1:LNLfqItblovg7mHWgU5g84Vhza4J8kTxx0YqIeTzcXY=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k=
cloud.google.com/go/recaptchaenterprise v1.3.1/go.mod h1:OdD+q+y4XGeAlxRaMn1Y7/GveP6zmq76byL6tjPE7d4=
@ -632,8 +632,8 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U=
github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw=
github.com/aws/aws-sdk-go-v2/config v1.27.33 h1:Nof9o/MsmH4oa0s2q9a0k7tMz5x/Yj5k06lDODWz3BU=
@ -672,8 +672,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 h1:/Cfdu0XV3mONYKaOt1Gr0k1K
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7/go.mod h1:bCbAxKDqNvkHxRaIMnyVPXPo+OaPRwvmgzMxbz1VKSA=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 h1:NKTa1eqZYw8tiHSRGpP0VtTdub/8KNk8sDkNPFaOKDE=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7/go.mod h1:NXi1dIAGteSaRLqYgarlhP/Ij0cFT+qmCwiJqWh/U5o=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@ -1051,8 +1051,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw=
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
@ -1192,8 +1192,8 @@ github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.12.3 h1:tzUznbfc3OFwJaTebv/QdhnFf2Xvb7gZ24XaHLBPmdc=
github.com/klauspost/reedsolomon v1.12.3/go.mod h1:3K5rXwABAvzGeR01r6pWZieUALXO/Tq7bFKGIb4m4WI=
github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA=
github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koofr/go-httpclient v0.0.0-20240520111329-e20f8f203988 h1:CjEMN21Xkr9+zwPmZPaJJw+apzVbjGL5uK/6g9Q2jGU=
github.com/koofr/go-httpclient v0.0.0-20240520111329-e20f8f203988/go.mod h1:/agobYum3uo/8V6yPVnq+R82pyVGCeuWW5arT4Txn8A=
@ -1608,8 +1608,8 @@ github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtC
github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo=
github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.einride.tech/aip v0.67.1 h1:d/4TW92OxXBngkSOwWS2CH5rez869KpKMaN44mdxkFI=
go.einride.tech/aip v0.67.1/go.mod h1:ZGX4/zKw8dcgzdLsrvpOOGxfxI2QSk12SlP7d6c0/XI=
go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw=
go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.etcd.io/etcd/api/v3 v3.5.16 h1:WvmyJVbjWqK4R1E+B12RRHz3bRGy9XVfh++MgbN+6n0=
@ -1629,18 +1629,18 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0/go.mod h1:azvtTADFQJA8mX80jIH/akaE7h+dbm/sVuaHqN13w74=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo=
go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
@ -1761,8 +1761,8 @@ golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1864,8 +1864,8 @@ golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri
golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec=
golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -2100,8 +2100,8 @@ golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -2175,8 +2175,8 @@ google.golang.org/api v0.106.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/
google.golang.org/api v0.107.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/O9MY=
google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI=
google.golang.org/api v0.195.0 h1:Ude4N8FvTKnnQJHU48RFI40jOBgIrL8Zqr3/QeST6yU=
google.golang.org/api v0.195.0/go.mod h1:DOGRWuv3P8TU8Lnz7uQc4hyNqrBpMtD9ppW3wBJurgc=
google.golang.org/api v0.198.0 h1:OOH5fZatk57iN0A7tjJQzt6aPfYQ1JiWkt1yGseazks=
google.golang.org/api v0.198.0/go.mod h1:/Lblzl3/Xqqk9hw/yS97TImKTUwnf1bv89v7+OagJzc=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -2310,12 +2310,12 @@ google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc/go.mod h1:RGgjbofJ
google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44/go.mod h1:8B0gmkoRebU8ukX6HP+4wrVQUY1+6PkQ44BSyIlflHA=
google.golang.org/genproto v0.0.0-20230222225845-10f96fb3dbec/go.mod h1:3Dl5ZL0q0isWJt+FVcfpQyirqemEuLAK/iFvg1UP1Hw=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c h1:TYOEhrQMrNDTAd2rX9m+WgGr8Ku6YNuj1D7OX6rWSok=
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c/go.mod h1:2rC5OendXvZ8wGEo/cSLheztrZDZaSoHanUcd1xtZnw=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c h1:Kqjm4WpoWvwhMPcrAczoTyMySQmYa9Wy2iL6Con4zn8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 h1:BulPr26Jqjnd4eYDVe+YvyR7Yc2vJGkO5/0UxD0/jZU=
google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:hL97c3SYopEHblzpxRL4lSs523++l8DYxGM1FQiYmb4=
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc=
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=

View File

@ -895,7 +895,7 @@ s3:
# For more information, visit: https://container-object-storage-interface.github.io/docs/deployment-guide
cosi:
enabled: false
image: "ghcr.io/seaweedfs/seaweedfs-cosi-driver:v0.1.1"
image: "ghcr.io/seaweedfs/seaweedfs-cosi-driver:v0.1.2"
driverName: "seaweedfs.objectstorage.k8s.io"
bucketClassName: "seaweedfs"
endpoint: ""

View File

@ -164,6 +164,9 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
entry, err = actualStore.FindEntry(ctx, fp)
// glog.V(4).Infof("FindEntry %s: %v", fp, err)
if err != nil {
if fsw.CanDropWholeBucket() && strings.Contains(err.Error(), "Table") && strings.Contains(err.Error(), "doesn't exist") {
err = filer_pb.ErrNotFound
}
return nil, err
}

View File

@ -90,6 +90,7 @@ message HeartbeatResponse {
uint32 metrics_interval_seconds = 4;
repeated StorageBackend storage_backends = 5;
repeated string duplicated_uuids = 6;
bool preallocate = 7;
}
message VolumeInformationMessage {

File diff suppressed because it is too large Load Diff

View File

@ -226,6 +226,7 @@ message VolumeDeleteResponse {
message VolumeMarkReadonlyRequest {
uint32 volume_id = 1;
bool persist = 2;
}
message VolumeMarkReadonlyResponse {
}

File diff suppressed because it is too large Load Diff

View File

@ -51,6 +51,8 @@ type S3ApiServer struct {
}
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
startTsNs := time.Now().UnixNano()
v := util.GetViper()
signingKey := v.GetString("jwt.filer_signing.key")
v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
@ -101,7 +103,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.registerRouter(router)
go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath})
go s3ApiServer.subscribeMetaEvents("s3", startTsNs, filer.DirectoryEtcRoot, []string{option.BucketsPath})
return s3ApiServer, nil
}

View File

@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@ -130,7 +130,8 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return nil, nil, err
}
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}

View File

@ -1,11 +1,65 @@
package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"io"
"math"
)
const MergeChunkMinCount int = 1000
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
//TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
return inputChunks, nil
// Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int
var minOffset int64 = math.MaxInt64
for _, chunk := range inputChunks {
if chunk.IsChunkManifest {
continue
}
if chunk.Size < uint64(chunkSize/2) {
smallChunk++
if chunk.Offset < minOffset {
minOffset = chunk.Offset
}
}
sumChunk++
}
if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
return inputChunks, nil
}
return fs.mergeChunks(so, inputChunks, minOffset)
}
func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil {
return nil, mergeErr
}
mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil {
return
}
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
for _, chunk := range inputChunks {
if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
mergedChunks = append(mergedChunks, chunk)
}
}
garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks)
return
}
fs.filer.DeleteChunksNotRecursive(garbage)
return
}

View File

@ -27,7 +27,7 @@ var bufPool = sync.Pool{
},
}
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query()
isAppend := isAppend(r)
@ -45,7 +45,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset = offsetInt
}
return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
}
func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
md5Hash = md5.New()
chunkOffset = startOffset
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
var wg sync.WaitGroup

View File

@ -151,6 +151,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
Preallocate: ms.preallocateSize > 0,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err

View File

@ -314,6 +314,10 @@ func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEn
for _, c := range shell.Commands {
if c.Name() == cmd {
if c.HasTag(shell.ResourceHeavy) {
glog.Warningf("%s is resource heavy and should not run on master", cmd)
continue
}
glog.V(0).Infof("executing: %s %v", cmd, args)
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
glog.V(0).Infof("error: %v", err)

View File

@ -130,7 +130,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
}
stateMachine := StateMachine{topo: option.Topo}
s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, s.serverAddr.ToGrpcAddress())
if err != nil {
glog.V(0).Infoln(err)
return nil, err

View File

@ -164,7 +164,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
// rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
// step 2: mark local volume as readonly
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId), req.GetPersist())
if err != nil {
glog.Errorf("volume mark readonly %v: %v", req, err)

View File

@ -130,8 +130,16 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
glog.Errorf("Shut down Volume Server due to duplicate volume directories: %v", duplicateDir)
os.Exit(1)
}
volumeOptsChanged := false
if vs.store.GetPreallocate() != in.GetPreallocate() {
vs.store.SetPreallocate(in.GetPreallocate())
volumeOptsChanged = true
}
if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
volumeOptsChanged = true
}
if volumeOptsChanged {
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)

View File

@ -150,8 +150,10 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
})
} else {
location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
_, found := location.FindEcVolume(needle.VolumeId(req.VolumeId))
return found
//(location.FindEcVolume) This method is error, will cause location is nil, redundant judgment
// _, found := location.FindEcVolume(needle.VolumeId(req.VolumeId))
// return found
return true
})
}
if location == nil {
@ -191,7 +193,6 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
return err
}
}
return nil
})
if err != nil {

20
weed/shell/command.go Normal file
View File

@ -0,0 +1,20 @@
package shell
import "io"
type command interface {
Name() string
Help() string
Do([]string, *CommandEnv, io.Writer) error
HasTag(tag CommandTag) bool
}
var (
Commands = []command{}
)
type CommandTag string
const (
ResourceHeavy CommandTag = "resourceHeavy"
)

View File

@ -32,6 +32,10 @@ func (c *commandClusterCheck) Help() string {
`
}
func (c *commandClusterCheck) HasTag(CommandTag) bool {
return false
}
func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -35,6 +35,10 @@ func (c *commandClusterPs) Help() string {
`
}
func (c *commandClusterPs) HasTag(CommandTag) bool {
return false
}
func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -27,6 +27,10 @@ func (c *commandRaftServerAdd) Help() string {
`
}
func (c *commandRaftServerAdd) HasTag(CommandTag) bool {
return false
}
func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -26,6 +26,10 @@ func (c *commandRaftClusterPs) Help() string {
`
}
func (c *commandRaftClusterPs) HasTag(CommandTag) bool {
return false
}
func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -27,6 +27,10 @@ func (c *commandRaftServerRemove) Help() string {
`
}
func (c *commandRaftServerRemove) HasTag(CommandTag) bool {
return false
}
func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -28,6 +28,10 @@ func (c *commandCollectionDelete) Help() string {
`
}
func (c *commandCollectionDelete) HasTag(CommandTag) bool {
return false
}
func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
colDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -23,6 +23,10 @@ func (c *commandCollectionList) Help() string {
return `list all collections`
}
func (c *commandCollectionList) HasTag(CommandTag) bool {
return false
}
type CollectionInfo struct {
FileCount float64
DeleteCount float64

View File

@ -98,6 +98,10 @@ func (c *commandEcBalance) Help() string {
`
}
func (c *commandEcBalance) HasTag(CommandTag) bool {
return false
}
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -37,6 +37,10 @@ func (c *commandEcDecode) Help() string {
`
}
func (c *commandEcDecode) HasTag(CommandTag) bool {
return false
}
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
decodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")

View File

@ -4,13 +4,14 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"math/rand"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@ -55,6 +56,10 @@ func (c *commandEcEncode) Help() string {
`
}
func (c *commandEcEncode) HasTag(CommandTag) bool {
return false
}
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -125,7 +130,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}

View File

@ -55,6 +55,10 @@ func (c *commandEcRebuild) Help() string {
`
}
func (c *commandEcRebuild) HasTag(CommandTag) bool {
return false
}
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -26,6 +26,10 @@ func (c *commandFsCat) Help() string {
`
}
func (c *commandFsCat) HasTag(CommandTag) bool {
return false
}
func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -28,6 +28,10 @@ func (c *commandFsCd) Help() string {
`
}
func (c *commandFsCd) HasTag(CommandTag) bool {
return false
}
func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -46,6 +46,10 @@ func (c *commandFsConfigure) Help() string {
`
}
func (c *commandFsConfigure) HasTag(CommandTag) bool {
return false
}
func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -29,6 +29,10 @@ func (c *commandFsDu) Help() string {
`
}
func (c *commandFsDu) HasTag(CommandTag) bool {
return false
}
func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -27,6 +27,10 @@ func (c *commandFsLogPurge) Help() string {
`
}
func (c *commandFsLogPurge) HasTag(CommandTag) bool {
return false
}
func (c *commandFsLogPurge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsLogPurgeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
daysAgo := fsLogPurgeCommand.Uint("daysAgo", 365, "purge logs older than N days")

View File

@ -33,6 +33,10 @@ func (c *commandFsLs) Help() string {
`
}
func (c *commandFsLs) HasTag(CommandTag) bool {
return false
}
func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var isLongFormat, showHidden bool

View File

@ -44,6 +44,10 @@ func (c *commandFsMergeVolumes) Help() string {
`
}
func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -322,7 +326,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
if err != nil {
return err
}
_, err, _ = uploader.Upload(reader, &operation.UploadOption{
UploadUrl: uploadURL,
Filename: filename,

View File

@ -30,6 +30,10 @@ func (c *commandFsMetaCat) Help() string {
`
}
func (c *commandFsMetaCat) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -37,6 +37,10 @@ func (c *commandFsMetaChangeVolumeId) Help() string {
`
}
func (c *commandFsMetaChangeVolumeId) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMetaChangeVolumeIdCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -38,6 +38,10 @@ func (c *commandFsMetaLoad) Help() string {
`
}
func (c *commandFsMetaLoad) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) == 0 {

View File

@ -30,6 +30,10 @@ func (c *commandFsMetaNotify) Help() string {
`
}
func (c *commandFsMetaNotify) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -44,6 +44,10 @@ func (c *commandFsMetaSave) Help() string {
`
}
func (c *commandFsMetaSave) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -27,6 +27,10 @@ func (c *commandFsMkdir) Help() string {
`
}
func (c *commandFsMkdir) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -34,6 +34,10 @@ func (c *commandFsMv) Help() string {
`
}
func (c *commandFsMv) HasTag(CommandTag) bool {
return false
}
func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 2 {

View File

@ -20,6 +20,10 @@ func (c *commandFsPwd) Help() string {
return `print out current directory`
}
func (c *commandFsPwd) HasTag(CommandTag) bool {
return false
}
func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory)

View File

@ -34,6 +34,10 @@ func (c *commandFsRm) Help() string {
`
}
func (c *commandFsRm) HasTag(CommandTag) bool {
return false
}
func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
isRecursive := false
ignoreRecursiveError := false

View File

@ -28,6 +28,10 @@ func (c *commandFsTree) Help() string {
`
}
func (c *commandFsTree) HasTag(CommandTag) bool {
return false
}
func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
path, err := commandEnv.parseUrl(findInputDirectory(args))

View File

@ -51,6 +51,10 @@ func (c *commandFsVerify) Help() string {
`
}
func (c *commandFsVerify) HasTag(CommandTag) bool {
return false
}
func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
c.env = commandEnv
c.writer = writer

View File

@ -25,6 +25,10 @@ func (c *commandLock) Help() string {
`
}
func (c *commandLock) HasTag(CommandTag) bool {
return false
}
func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
commandEnv.locker.RequestLock(util.DetectedHostAddress())
@ -47,6 +51,10 @@ func (c *commandUnlock) Help() string {
`
}
func (c *commandUnlock) HasTag(CommandTag) bool {
return false
}
func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
commandEnv.locker.ReleaseLock()

View File

@ -34,6 +34,10 @@ func (c *commandMountConfigure) Help() string {
`
}
func (c *commandMountConfigure) HasTag(CommandTag) bool {
return false
}
func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -25,6 +25,10 @@ func (c *commandMqBalanceTopics) Help() string {
`
}
func (c *commandMqBalanceTopics) HasTag(CommandTag) bool {
return false
}
func (c *commandMqBalanceTopics) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// find the broker balancer

View File

@ -29,6 +29,10 @@ func (c *commandMqTopicConfigure) Help() string {
`
}
func (c *commandMqTopicConfigure) HasTag(CommandTag) bool {
return false
}
func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// parse parameters

View File

@ -24,6 +24,10 @@ func (c *commandMqTopicDescribe) Help() string {
return `describe a topic`
}
func (c *commandMqTopicDescribe) HasTag(CommandTag) bool {
return false
}
func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
// parse parameters
mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -25,6 +25,10 @@ func (c *commandMqTopicList) Help() string {
return `print out all topics`
}
func (c *commandMqTopicList) HasTag(CommandTag) bool {
return false
}
func (c *commandMqTopicList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
brokerBalancer, err := findBrokerBalancer(commandEnv)

View File

@ -46,6 +46,10 @@ func (c *commandRemoteCache) Help() string {
`
}
func (c *commandRemoteCache) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -48,6 +48,10 @@ func (c *commandRemoteConfigure) Help() string {
`
}
func (c *commandRemoteConfigure) HasTag(CommandTag) bool {
return false
}
var (
isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString
)

View File

@ -44,6 +44,10 @@ func (c *commandRemoteMetaSync) Help() string {
`
}
func (c *commandRemoteMetaSync) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteMetaSync) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMetaSyncCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -44,6 +44,10 @@ func (c *commandRemoteMount) Help() string {
`
}
func (c *commandRemoteMount) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -38,6 +38,10 @@ func (c *commandRemoteMountBuckets) Help() string {
`
}
func (c *commandRemoteMountBuckets) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteMountBuckets) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountBucketsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -41,6 +41,10 @@ func (c *commandRemoteUncache) Help() string {
`
}
func (c *commandRemoteUncache) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -165,12 +169,12 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
}
}
if *ff.minAge != -1 {
if entry.Attributes.Crtime + *ff.minAge > time.Now().Unix() {
if entry.Attributes.Crtime+*ff.minAge > time.Now().Unix() {
return false
}
}
if *ff.maxAge != -1 {
if entry.Attributes.Crtime + *ff.maxAge < time.Now().Unix() {
if entry.Attributes.Crtime+*ff.maxAge < time.Now().Unix() {
return false
}
}

View File

@ -37,6 +37,10 @@ func (c *commandRemoteUnmount) Help() string {
`
}
func (c *commandRemoteUnmount) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -30,6 +30,10 @@ func (c *commandS3BucketCreate) Help() string {
`
}
func (c *commandS3BucketCreate) HasTag(CommandTag) bool {
return false
}
func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -28,6 +28,10 @@ func (c *commandS3BucketDelete) Help() string {
`
}
func (c *commandS3BucketDelete) HasTag(CommandTag) bool {
return false
}
func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -27,6 +27,10 @@ func (c *commandS3BucketList) Help() string {
`
}
func (c *commandS3BucketList) HasTag(CommandTag) bool {
return false
}
func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -28,6 +28,10 @@ func (c *commandS3BucketQuota) Help() string {
`
}
func (c *commandS3BucketQuota) HasTag(CommandTag) bool {
return false
}
func (c *commandS3BucketQuota) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -29,6 +29,10 @@ func (c *commandS3BucketQuotaEnforce) Help() string {
`
}
func (c *commandS3BucketQuotaEnforce) HasTag(CommandTag) bool {
return false
}
func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -51,6 +51,10 @@ func (c *commandS3CircuitBreaker) Help() string {
`
}
func (c *commandS3CircuitBreaker) HasTag(CommandTag) bool {
return false
}
func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
dir := s3_constants.CircuitBreakerConfigDir
file := s3_constants.CircuitBreakerConfigFile

View File

@ -34,6 +34,10 @@ func (c *commandS3CleanUploads) Help() string {
`
}
func (c *commandS3CleanUploads) HasTag(CommandTag) bool {
return false
}
func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")

View File

@ -33,6 +33,10 @@ func (c *commandS3Configure) Help() string {
`
}
func (c *commandS3Configure) HasTag(CommandTag) bool {
return false
}
func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
s3ConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -64,6 +64,10 @@ func (c *commandVolumeBalance) Help() string {
`
}
func (c *commandVolumeBalance) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -5,6 +5,12 @@ import (
"context"
"flag"
"fmt"
"io"
"math"
"net/http"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@ -13,11 +19,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"io"
"math"
"net/http"
"sync"
"time"
)
func init() {
@ -47,6 +48,10 @@ func (c *commandVolumeCheckDisk) Help() string {
`
}
func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool {
return tag == ResourceHeavy
}
func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
@ -141,23 +146,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
continue
}
slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
// filter readonly replica
var writableReplicas []*VolumeReplica
for _, replica := range replicas {
if replica.info.ReadOnly {
fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
} else {
writableReplicas = append(writableReplicas, replica)
}
}
slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int {
return int(b.info.FileCount - a.info.FileCount)
})
for len(replicas) >= 2 {
a, b := replicas[0], replicas[1]
replicas = replicas[1:]
if a.info.ReadOnly || b.info.ReadOnly {
fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n",
a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
continue
}
for len(writableReplicas) >= 2 {
a, b := writableReplicas[0], writableReplicas[1]
if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
// always choose the larger volume to be the source
writableReplicas = append(replicas[:1], writableReplicas[2:]...)
continue
}
if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
// always choose the larger volume to be the source
if a.info.FileCount > b.info.FileCount {
writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
} else {
writableReplicas = writableReplicas[1:]
}
}
}
@ -191,13 +208,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
}
// find and make up the differences
if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
if err1 != nil {
return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
}
if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
if err2 != nil {
return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2)
}
return
return aHasChanges, bHasChanges, nil
}
func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {

View File

@ -35,6 +35,10 @@ func (c *commandVolumeConfigureReplication) Help() string {
`
}
func (c *commandVolumeConfigureReplication) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, _ io.Writer) (err error) {
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -31,6 +31,10 @@ func (c *commandVolumeCopy) Help() string {
`
}
func (c *commandVolumeCopy) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volCopyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -29,6 +29,10 @@ func (c *commandVolumeDelete) Help() string {
`
}
func (c *commandVolumeDelete) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -32,6 +32,10 @@ func (c *commandVolumeDeleteEmpty) Help() string {
`
}
func (c *commandVolumeDeleteEmpty) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -55,6 +55,10 @@ func (c *commandVolumeFixReplication) Help() string {
`
}
func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
}
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -99,7 +103,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
switch {
case replicaPlacement.GetCopyCount() > len(replicas):
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
case isMisplaced(replicas, replicaPlacement):
misplacedVolumeIds = append(misplacedVolumeIds, vid)
@ -377,6 +381,27 @@ func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
})
}
func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
existingDataCenters, existingRacks, _ := countReplicas(replicas)
if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
return false
}
if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
return false
}
if replicaPlacement.SameRackCount > 0 {
foundSatisfyRack := false
for _, rackCount := range existingRacks {
if rackCount >= replicaPlacement.SameRackCount+1 {
foundSatisfyRack = true
}
}
return foundSatisfyRack
}
return true
}
/*
if on an existing data node {
return false

View File

@ -438,3 +438,90 @@ func TestPickingMisplacedVolumeToDelete(t *testing.T) {
}
}
func TestSatisfyReplicaCurrentLocation(t *testing.T) {
var tests = []testcase{
{
name: "test 001",
replication: "001",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: false,
},
{
name: "test 010",
replication: "010",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: true,
},
{
name: "test 011",
replication: "011",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
},
},
expected: true,
},
{
name: "test 110",
replication: "110",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
},
{
location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
},
},
expected: true,
},
{
name: "test 100",
replication: "100",
replicas: []*VolumeReplica{
{
location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
},
{
location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
},
},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
if satisfyReplicaCurrentLocation(replicaPlacement, tt.replicas) != tt.expected {
t.Errorf("%s: expect %v %v %+v",
tt.name, tt.expected, tt.replication, tt.replicas)
}
})
}
}

View File

@ -60,7 +60,7 @@ func (c *commandVolumeFsck) Name() string {
}
func (c *commandVolumeFsck) Help() string {
return `check all volumes to find entries not used by the filer
return `check all volumes to find entries not used by the filer. It is optional and resource intensive.
Important assumption!!!
the system is all used by one filer.
@ -79,6 +79,10 @@ func (c *commandVolumeFsck) Help() string {
`
}
func (c *commandVolumeFsck) HasTag(tag CommandTag) bool {
return tag == ResourceHeavy
}
func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -359,12 +363,12 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
needleVID := needle.VolumeId(volumeId)
if isReadOnlyReplicas[volumeId] {
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true)
err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true, false)
if err != nil {
return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
}
fmt.Fprintf(c.writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false)
defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false, false)
fmt.Fprintf(c.writer, "marked %d on server %v writable for forced purge\n", volumeId, server)
}

View File

@ -30,6 +30,10 @@ func (c *commandGrow) Help() string {
`
}
func (c *commandGrow) HasTag(CommandTag) bool {
return false
}
func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -39,6 +39,10 @@ func (c *commandVolumeList) Help() string {
`
}
func (c *commandVolumeList) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeListCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -3,9 +3,10 @@ package shell
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -27,6 +28,10 @@ func (c *commandVolumeMark) Help() string {
`
}
func (c *commandVolumeMark) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -52,5 +57,5 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
volumeId := needle.VolumeId(*volumeIdInt)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable, true)
}

View File

@ -33,6 +33,10 @@ func (c *commandVolumeMount) Help() string {
`
}
func (c *commandVolumeMount) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -4,12 +4,13 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"io"
"log"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -48,6 +49,10 @@ func (c *commandVolumeMove) Help() string {
`
}
func (c *commandVolumeMove) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -132,6 +137,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
shouldMarkWritable = true
_, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
Persist: false,
})
return readonlyErr
}
@ -197,7 +203,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
})
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@ -206,16 +212,17 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
} else {
_, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
Persist: persist,
})
}
return err
})
}
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
return err
}
}

View File

@ -46,6 +46,10 @@ func (c *commandVolumeServerEvacuate) Help() string {
`
}
func (c *commandVolumeServerEvacuate) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -35,6 +35,10 @@ func (c *commandVolumeServerLeave) Help() string {
`
}
func (c *commandVolumeServerLeave) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -41,6 +41,10 @@ func (c *commandVolumeTierDownload) Help() string {
`
}
func (c *commandVolumeTierDownload) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -53,6 +53,10 @@ func (c *commandVolumeTierMove) Help() string {
`
}
func (c *commandVolumeTierMove) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -234,14 +238,14 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
}
// mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
}

View File

@ -4,10 +4,11 @@ import (
"context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/operation"
@ -55,6 +56,10 @@ func (c *commandVolumeTierUpload) Help() string {
`
}
func (c *commandVolumeTierUpload) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@ -102,7 +107,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("volume %d not found", vid)
}
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false)
err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, existingLocations, false, false)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, existingLocations[0].Url, err)
}

View File

@ -34,6 +34,10 @@ func (c *commandVolumeUnmount) Help() string {
`
}
func (c *commandVolumeUnmount) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volUnmountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -27,6 +27,10 @@ func (c *commandVacuum) Help() string {
`
}
func (c *commandVacuum) HasTag(CommandTag) bool {
return false
}
func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

View File

@ -26,6 +26,10 @@ func (c *commandDisableVacuum) Help() string {
`
}
func (c *commandDisableVacuum) HasTag(CommandTag) bool {
return false
}
func (c *commandDisableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {

View File

@ -26,6 +26,10 @@ func (c *commandEnableVacuum) Help() string {
`
}
func (c *commandEnableVacuum) HasTag(CommandTag) bool {
return false
}
func (c *commandEnableVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if err = commandEnv.confirmIsLocked(args); err != nil {

View File

@ -6,7 +6,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"io"
"net/url"
"strconv"
"strings"
@ -38,16 +37,6 @@ type CommandEnv struct {
locker *exclusive_locks.ExclusiveLocker
}
type command interface {
Name() string
Help() string
Do([]string, *CommandEnv, io.Writer) error
}
var (
Commands = []command{}
)
func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),

View File

@ -27,12 +27,14 @@ const (
Failed = "failed"
// filer handler
DirList = "dirList"
ContentSaveToFiler = "contentSaveToFiler"
AutoChunk = "autoChunk"
ChunkProxy = "chunkProxy"
ChunkAssign = "chunkAssign"
ChunkUpload = "chunkUpload"
DirList = "dirList"
ContentSaveToFiler = "contentSaveToFiler"
AutoChunk = "autoChunk"
ChunkProxy = "chunkProxy"
ChunkAssign = "chunkAssign"
ChunkUpload = "chunkUpload"
ChunkMerge = "chunkMerge"
ChunkDoUploadRetry = "chunkDoUploadRetry"
ChunkUploadRetry = "chunkUploadRetry"
ChunkAssignRetry = "chunkAssignRetry"

View File

@ -57,7 +57,8 @@ type ReadOption struct {
type Store struct {
MasterAddress pb.ServerAddress
grpcDialOption grpc.DialOption
volumeSizeLimit uint64 // read from the master
volumeSizeLimit uint64 // read from the master
preallocate atomic.Bool // read from the master
Ip string
Port int
GrpcPort int
@ -470,14 +471,16 @@ func (s *Store) HasVolume(i needle.VolumeId) bool {
return v != nil
}
func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
func (s *Store) MarkVolumeReadonly(i needle.VolumeId, persist bool) error {
v := s.findVolume(i)
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
v.noWriteLock.Lock()
v.noWriteOrDelete = true
v.PersistReadOnly(true)
if persist {
v.PersistReadOnly(true)
}
v.noWriteLock.Unlock()
return nil
}
@ -607,6 +610,14 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
func (s *Store) SetPreallocate(x bool) {
s.preallocate.Store(x)
}
func (s *Store) GetPreallocate() bool {
return s.preallocate.Load()
}
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
if volumeSizeLimit == 0 {
@ -617,10 +628,15 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := atomic.LoadInt32(&diskLocation.MaxVolumeCount)
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
var unusedSpace uint64 = 0
unclaimedSpaces := int64(diskStatus.Free)
if !s.GetPreallocate() {
unusedSpace = diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces -= int64(unusedSpace)
}
volCount := diskLocation.VolumesLen()
maxVolumeCount := int32(volCount)
ecShardCount := diskLocation.EcShardCount()
maxVolumeCount := int32(volCount) + int32((ecShardCount+erasure_coding.DataShardsCount)/erasure_coding.DataShardsCount)
if unclaimedSpaces > int64(volumeSizeLimit) {
maxVolumeCount += int32(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
}

View File

@ -14,7 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
@ -35,11 +35,7 @@ func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAt
}
}
if healthyIndexSize < indexSize {
glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
err = indexFile.Truncate(healthyIndexSize)
if err != nil {
glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
}
return 0, fmt.Errorf("CheckVolumeDataIntegrity %s failed: index size %d differs from healthy size %d", indexFile.Name(), indexSize, healthyIndexSize)
}
return
}

View File

@ -135,7 +135,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
// capactiy overloading.
if !v.HasRemoteFile() {
glog.V(0).Infof("checking volume data integrity for volume %d", v.Id)
if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}