diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 48916c54af1..191e06175f1 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -59,6 +59,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" name: swarm-act-history-address required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -89,6 +94,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmEncryptedReference" required: true description: Grantee list reference + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK @@ -139,6 +149,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmDeferredUpload" name: swarm-deferred-upload required: false + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false requestBody: required: true content: @@ -306,6 +321,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: description: Chunk binary data that has to have at least 8 bytes. content: @@ -677,6 +698,12 @@ paths: summary: Pin the root hash with the given reference tags: - Pinning + parameters: + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Pin already exists, so no operation @@ -885,6 +912,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only requestBody: required: true description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload. @@ -983,6 +1016,12 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false + description: Redundancy level for ACT encryption only responses: "201": description: Created @@ -1080,6 +1119,11 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmReference" required: true description: "Root hash of content (can be of any type: collection, file, chunk)" + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: Returns if the content is retrievable @@ -1109,6 +1153,11 @@ paths: $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" name: swarm-postage-batch-id description: Postage batch to use for re-upload. If none is provided and the file was uploaded on the same node before, it will reuse the same batch. If not found, it will return error. If a new batch is provided, the chunks are stamped again with the new batch. + - in: header + schema: + $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" + name: swarm-redundancy-level + required: false responses: "200": description: OK diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index 1ae0fb2fe6e..06cce076a39 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -100,10 +100,11 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { } headers := struct { - Timestamp *int64 `map:"Swarm-Act-Timestamp"` - Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` - Cache *bool `map:"Swarm-Cache"` + Timestamp *int64 `map:"Swarm-Act-Timestamp"` + Publisher *ecdsa.PublicKey `map:"Swarm-Act-Publisher"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -125,8 +126,14 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) reference, err := s.accesscontrol.DownloadHandler(ctx, ls, paths.Address, headers.Publisher, *headers.HistoryAddress, timestamp) if err != nil { logger.Debug("access control download failed", "error", err) @@ -157,9 +164,10 @@ func (s *Service) actEncryptionHandler( putter storer.PutterSession, reference swarm.Address, historyRootHash swarm.Address, + rLevel redundancy.Level, ) (swarm.Address, swarm.Address, error) { publisherPublicKey := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) storageReference, historyReference, encryptedReference, err := s.accesscontrol.UploadHandler(ctx, ls, reference, publisherPublicKey, historyRootHash) if err != nil { return swarm.ZeroAddress, swarm.ZeroAddress, err @@ -193,7 +201,8 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - Cache *bool `map:"Swarm-Cache"` + Cache *bool `map:"Swarm-Cache"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -203,8 +212,14 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) if headers.Cache != nil { cache = *headers.Cache } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + publisher := &s.publicKey - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) grantees, err := s.accesscontrol.Get(r.Context(), ls, publisher, paths.GranteesAddress) if err != nil { logger.Debug("could not get grantees", "error", err) @@ -239,11 +254,12 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -255,6 +271,11 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) historyAddress = *headers.HistoryAddress } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -344,8 +365,8 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) granteeref := paths.GranteesAddress publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, granteeref, historyAddress, publisher, grantees.Addlist, grantees.Revokelist) if err != nil { logger.Debug("failed to update grantee list", "error", err) @@ -405,11 +426,12 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - SwarmTag uint64 `map:"Swarm-Tag"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + SwarmTag uint64 `map:"Swarm-Tag"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + HistoryAddress *swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -421,6 +443,11 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques historyAddress = *headers.HistoryAddress } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( tag uint64 err error @@ -498,8 +525,8 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), rLevel) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), rLevel) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, swarm.ZeroAddress, historyAddress, publisher, list, nil) if err != nil { logger.Debug("failed to create grantee list", "error", err) diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 4c1cd891df0..fe15623dffc 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -123,7 +123,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := reference historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, headers.RLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a27e3f813c5..66ec780530a 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -271,7 +271,7 @@ func (s *Service) fileUploadHandler( reference := manifestReference historyReference := swarm.ZeroAddress if act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") @@ -406,7 +406,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV } ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), rLevel) feedDereferenced := false ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) diff --git a/pkg/api/chequebook.go b/pkg/api/chequebook.go index 33cd75d9767..219f9479089 100644 --- a/pkg/api/chequebook.go +++ b/pkg/api/chequebook.go @@ -22,7 +22,6 @@ import ( const ( errChequebookBalance = "cannot get chequebook balance" - errChequebookNoAmount = "did not specify amount" errChequebookNoWithdraw = "cannot withdraw" errChequebookNoDeposit = "cannot deposit" errChequebookInsufficientFunds = "insufficient funds" diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index a9ffcad5ae0..67edea3880a 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer" @@ -32,11 +33,12 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("post_chunk").Build() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - SwarmTag uint64 `map:"Swarm-Tag"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + SwarmTag uint64 `map:"Swarm-Tag"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -187,7 +189,11 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { reference := chunk.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/dirs.go b/pkg/api/dirs.go index 58160790857..dae42bbd43b 100644 --- a/pkg/api/dirs.go +++ b/pkg/api/dirs.go @@ -104,7 +104,7 @@ func (s *Service) dirUploadHandler( encryptedReference := reference historyReference := swarm.ZeroAddress if act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress) + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, historyAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 313b04c831a..5f1f257e9b5 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -170,11 +170,12 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -283,7 +284,11 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { encryptedReference := ref historyReference := swarm.ZeroAddress if headers.Act { - encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + encryptedReference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, ref, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 696f5185150..b1e43704098 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -32,6 +32,19 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { return } + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + has, err := s.storer.HasPin(paths.Reference) if err != nil { logger.Debug("pin root hash: has pin failed", "chunk_address", paths.Reference, "error", err) @@ -53,7 +66,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { } getter := s.storer.Download(true) - traverser := traversal.New(getter, s.storer.Cache(), redundancy.DefaultLevel) + traverser := traversal.New(getter, s.storer.Cache()) sem := semaphore.NewWeighted(100) var errTraverse error @@ -93,6 +106,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) { }() return nil }, + rLevel, ) wg.Wait() diff --git a/pkg/api/pss.go b/pkg/api/pss.go index ef1c3a84d47..473fce3d8d0 100644 --- a/pkg/api/pss.go +++ b/pkg/api/pss.go @@ -24,8 +24,7 @@ import ( ) const ( - writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close - targetMaxLength = 3 // max target length in bytes, in order to prevent grieving by excess computation + writeDeadline = 4 * time.Second // write deadline. should be smaller than the shutdown timeout on api close ) func (s *Service) pssPostHandler(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 85d9bf5aaa3..698bf8a252f 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -14,6 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/soc" @@ -47,10 +48,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -183,7 +185,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress, rLevel) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") diff --git a/pkg/api/stewardship.go b/pkg/api/stewardship.go index b11b5ea5a6c..962ba41991f 100644 --- a/pkg/api/stewardship.go +++ b/pkg/api/stewardship.go @@ -8,6 +8,7 @@ import ( "errors" "net/http" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -29,13 +30,19 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + var ( batchID []byte err error @@ -57,7 +64,7 @@ func (s *Service) stewardshipPutHandler(w http.ResponseWriter, r *http.Request) return } - err = s.steward.Reupload(r.Context(), paths.Address, stamper) + err = s.steward.Reupload(r.Context(), paths.Address, stamper, rLevel) if err != nil { logger.Debug("re-upload failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "re-upload failed") @@ -91,7 +98,20 @@ func (s *Service) stewardshipGetHandler(w http.ResponseWriter, r *http.Request) return } - res, err := s.steward.IsRetrievable(r.Context(), paths.Address) + headers := struct { + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + rLevel := redundancy.PARANOID + if headers.RLevel != nil { + rLevel = *headers.RLevel + } + + res, err := s.steward.IsRetrievable(r.Context(), paths.Address, rLevel) if err != nil { logger.Debug("is retrievable check failed", "chunk_address", paths.Address, "error", err) logger.Error(nil, "is retrievable") diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 39bf23f87c3..377019f5db8 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -244,7 +244,14 @@ func (g *decoder) runStrategy(s Strategy) error { c := make(chan error, len(m)) ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + completed := 0 + defer func() { + cancel() + remaining := len(m) - completed + for i := 0; i < remaining; i++ { + <-c + } + }() for _, i := range m { go func(i int) { @@ -252,7 +259,9 @@ func (g *decoder) runStrategy(s Strategy) error { }(i) } - for range c { + for completed < len(m) { + <-c + completed++ if g.fetchedCnt.Load() >= int32(g.shardCnt) { return nil } diff --git a/pkg/steward/mock/steward.go b/pkg/steward/mock/steward.go index b8cee8b3588..737ddf8b3d4 100644 --- a/pkg/steward/mock/steward.go +++ b/pkg/steward/mock/steward.go @@ -7,6 +7,7 @@ package mock import ( "context" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -18,14 +19,14 @@ type Steward struct { // Reupload implements steward.Interface Reupload method. // The given address is recorded. -func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper) error { +func (s *Steward) Reupload(_ context.Context, addr swarm.Address, _ postage.Stamper, _ redundancy.Level) error { s.addr = addr return nil } // IsRetrievable implements steward.Interface IsRetrievable method. // The method always returns true. -func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address) (bool, error) { +func (s *Steward) IsRetrievable(_ context.Context, addr swarm.Address, _ redundancy.Level) (bool, error) { return addr.Equal(s.addr), nil } diff --git a/pkg/steward/steward.go b/pkg/steward/steward.go index 4272095f4ee..463d389814e 100644 --- a/pkg/steward/steward.go +++ b/pkg/steward/steward.go @@ -24,11 +24,11 @@ import ( type Interface interface { // Reupload root hash and all of its underlying // associated chunks to the network. - Reupload(context.Context, swarm.Address, postage.Stamper) error + Reupload(context.Context, swarm.Address, postage.Stamper, redundancy.Level) error // IsRetrievable checks whether the content // on the given address is retrievable. - IsRetrievable(context.Context, swarm.Address) (bool, error) + IsRetrievable(context.Context, swarm.Address, redundancy.Level) (bool, error) } type steward struct { @@ -41,8 +41,8 @@ type steward struct { func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) Interface { return &steward{ netStore: ns, - traverser: traversal.New(ns.Download(true), joinerPutter, redundancy.DefaultLevel), - netTraverser: traversal.New(&netGetter{r}, joinerPutter, redundancy.DefaultLevel), + traverser: traversal.New(ns.Download(true), joinerPutter), + netTraverser: traversal.New(&netGetter{r}, joinerPutter), netGetter: r, } } @@ -52,7 +52,7 @@ func New(ns storer.NetStore, r retrieval.Interface, joinerPutter storage.Putter) // addresses and push every chunk individually to the network. // It assumes all chunks are available locally. It is therefore // advisable to pin the content locally before trying to reupload it. -func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper) error { +func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper postage.Stamper, rLevel redundancy.Level) error { uploaderSession := s.netStore.DirectUpload() getter := s.netStore.Download(false) @@ -70,7 +70,7 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post return uploaderSession.Put(ctx, c.WithStamp(stamp)) } - if err := s.traverser.Traverse(ctx, root, fn); err != nil { + if err := s.traverser.Traverse(ctx, root, fn, rLevel); err != nil { return errors.Join( fmt.Errorf("traversal of %s failed: %w", root.String(), err), uploaderSession.Cleanup(), @@ -81,12 +81,12 @@ func (s *steward) Reupload(ctx context.Context, root swarm.Address, stamper post } // IsRetrievable implements Interface.IsRetrievable method. -func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address) (bool, error) { +func (s *steward) IsRetrievable(ctx context.Context, root swarm.Address, rLevel redundancy.Level) (bool, error) { fn := func(a swarm.Address) error { _, err := s.netGetter.RetrieveChunk(ctx, a, swarm.ZeroAddress) return err } - switch err := s.netTraverser.Traverse(ctx, root, fn); { + switch err := s.netTraverser.Traverse(ctx, root, fn, rLevel); { case errors.Is(err, storage.ErrNotFound): return false, nil case errors.Is(err, topology.ErrNotFound): diff --git a/pkg/steward/steward_test.go b/pkg/steward/steward_test.go index 4684c4713c4..32978265188 100644 --- a/pkg/steward/steward_test.go +++ b/pkg/steward/steward_test.go @@ -87,7 +87,7 @@ func TestSteward(t *testing.T) { } }() - err = s.Reupload(ctx, addr, stamper) + err = s.Reupload(ctx, addr, stamper, redundancy.PARANOID) if err != nil { t.Fatal(err) } @@ -104,7 +104,7 @@ func TestSteward(t *testing.T) { default: } - isRetrievable, err := s.IsRetrievable(ctx, addr) + isRetrievable, err := s.IsRetrievable(ctx, addr, redundancy.PARANOID) if err != nil { t.Fatal(err) } diff --git a/pkg/traversal/traversal.go b/pkg/traversal/traversal.go index 7bb3ade6ce9..2516be5301a 100644 --- a/pkg/traversal/traversal.go +++ b/pkg/traversal/traversal.go @@ -26,25 +26,24 @@ import ( // Traverser represents service which traverse through address dependent chunks. type Traverser interface { // Traverse iterates through each address related to the supplied one, if possible. - Traverse(context.Context, swarm.Address, swarm.AddressIterFunc) error + Traverse(context.Context, swarm.Address, swarm.AddressIterFunc, redundancy.Level) error } // New constructs for a new Traverser. -func New(getter storage.Getter, putter storage.Putter, rLevel redundancy.Level) Traverser { - return &service{getter: getter, putter: putter, rLevel: rLevel} +func New(getter storage.Getter, putter storage.Putter) Traverser { + return &service{getter: getter, putter: putter} } // service is implementation of Traverser using storage.Storer as its storage. type service struct { getter storage.Getter putter storage.Putter - rLevel redundancy.Level } // Traverse implements Traverser.Traverse method. -func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc) error { +func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm.AddressIterFunc, rLevel redundancy.Level) error { processBytes := func(ref swarm.Address) error { - j, _, err := joiner.New(ctx, s.getter, s.putter, ref, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, ref, rLevel) if err != nil { return fmt.Errorf("traversal: joiner error on %q: %w", ref, err) } @@ -67,7 +66,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm } } - j, _, err := joiner.New(ctx, s.getter, s.putter, addr, s.rLevel) + j, _, err := joiner.New(ctx, s.getter, s.putter, addr, rLevel) if err != nil { return err } @@ -77,7 +76,7 @@ func (s *service) Traverse(ctx context.Context, addr swarm.Address, iterFn swarm // then the reference is likely a manifest reference. This is because manifest holds metadata // that points to the actual data file, and this metadata is assumed to be small - Less than or equal to swarm.ChunkSize. if j.Size() <= swarm.ChunkSize { - ls := loadsave.NewReadonly(s.getter, s.putter, s.rLevel) + ls := loadsave.NewReadonly(s.getter, s.putter, rLevel) switch mf, err := manifest.NewDefaultManifestReference(addr, ls); { case errors.Is(err, manifest.ErrInvalidManifestType): break diff --git a/pkg/traversal/traversal_test.go b/pkg/traversal/traversal_test.go index 2853e7961f6..07373a67a09 100644 --- a/pkg/traversal/traversal_test.go +++ b/pkg/traversal/traversal_test.go @@ -167,7 +167,7 @@ func TestTraversalBytes(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestTraversalFiles(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -450,7 +450,7 @@ func TestTraversalManifest(t *testing.T) { t.Fatal(err) } - err = traversal.New(storerMock, storerMock, redundancy.DefaultLevel).Traverse(ctx, address, iter.Next) + err = traversal.New(storerMock, storerMock).Traverse(ctx, address, iter.Next, redundancy.DefaultLevel) if err != nil { t.Fatal(err) } @@ -488,7 +488,7 @@ func TestTraversalSOC(t *testing.T) { t.Fatal(err) } - err = traversal.New(store, store, 0).Traverse(ctx, sch.Address(), iter.Next) + err = traversal.New(store, store).Traverse(ctx, sch.Address(), iter.Next, redundancy.NONE) if err != nil { t.Fatal(err) }