Skip to content

Commit

Permalink
add renewal for scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
tanay-vakharia committed Dec 6, 2024
1 parent 74ec557 commit 8df1eda
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 12 deletions.
87 changes: 78 additions & 9 deletions hrpc/hrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,38 +218,48 @@ func TestNewScan(t *testing.T) {
stopb := []byte("100")
scan, err := NewScan(ctx, tableb)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan1 didn't set attributes correctly.")
}
scan, err = NewScanRange(ctx, tableb, startb, stopb)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan2 didn't set attributes correctly.")
}
scan, err = NewScanStr(ctx, table)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan3 didn't set attributes correctly.")
}
scan, err = NewScanRangeStr(ctx, table, start, stop)
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, nil, nil,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan4 didn't set attributes correctly.")
}
scan, err = NewScanRange(ctx, tableb, startb, stopb, Families(fam), Filters(filter1))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, startb, stopb, fam, filter1,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan5 didn't set attributes correctly.")
}
scan, err = NewScan(ctx, tableb, Filters(filter1), Families(fam))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, fam, filter1,
DefaultNumberOfRows) {
DefaultNumberOfRows, 0, false) {
t.Errorf("Scan6 didn't set attributes correctly.")
}
scan, err = NewScan(ctx, tableb, NumberOfRows(1))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1) {
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil, 1, 0, false) {
t.Errorf("Scan7 didn't set number of versions correctly")
}
scan, err = NewScan(ctx, tableb, RenewInterval(10*time.Second))
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows, 10*time.Second, false) {
t.Errorf("Scan8 didn't set renew correctly")
}
scan, err = NewScan(ctx, tableb, RenewalScan())
if err != nil || !confirmScanAttributes(ctx, scan, tableb, nil, nil, nil, nil,
DefaultNumberOfRows, 0, true) {
t.Errorf("Scan8 didn't set renew correctly")
}
}

func TestScanToProto(t *testing.T) {
Expand Down Expand Up @@ -286,6 +296,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // explicitly set configurable attributes to default values
Expand Down Expand Up @@ -319,6 +330,7 @@ func TestScanToProto(t *testing.T) {
CacheBlocks: nil,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set configurable attributes to non-default values
Expand All @@ -331,6 +343,8 @@ func TestScanToProto(t *testing.T) {
MaxVersions(89),
CacheBlocks(!DefaultCacheBlocks),
TimeRangeUint64(1024, 1738),
RenewInterval(10*time.Second),
RenewalScan(),
)
return s
}(),
Expand All @@ -353,6 +367,7 @@ func TestScanToProto(t *testing.T) {
CacheBlocks: proto.Bool(!DefaultCacheBlocks),
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(true),
},
},
{ // test that pb.ScanRequest.Scan is nil when scanner id is specificed
Expand All @@ -378,6 +393,7 @@ func TestScanToProto(t *testing.T) {
ClientHandlesHeartbeats: proto.Bool(true),
Scan: nil,
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set reversed attribute
Expand All @@ -398,6 +414,7 @@ func TestScanToProto(t *testing.T) {
Reversed: proto.Bool(true),
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set scan attribute
Expand All @@ -424,6 +441,7 @@ func TestScanToProto(t *testing.T) {
},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // scan key range
Expand All @@ -445,6 +463,7 @@ func TestScanToProto(t *testing.T) {
StopRow: stopRow,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
{ // set filters and families
Expand All @@ -467,6 +486,7 @@ func TestScanToProto(t *testing.T) {
Filter: pbFilter,
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
}
}(),
},
Expand All @@ -487,6 +507,7 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
},
},
// set TrackScanMetrics
Expand All @@ -507,6 +528,51 @@ func TestScanToProto(t *testing.T) {
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(true),
Renew: proto.Bool(false),
}
}(),
},
// set RenewInterval, this shouldn't affect the protobuf
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", RenewInterval(10*time.Second))
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(false),
}
}(),
},
// set RenewalScan
{
s: func() *Scan {
s, _ := NewScanStr(ctx, "", RenewalScan())
return s
}(),
expProto: func() *pb.ScanRequest {
return &pb.ScanRequest{
Region: rs,
NumberOfRows: proto.Uint32(DefaultNumberOfRows),
CloseScanner: proto.Bool(false),
ClientHandlesPartials: proto.Bool(true),
ClientHandlesHeartbeats: proto.Bool(true),
Scan: &pb.Scan{
MaxResultSize: proto.Uint64(DefaultMaxResultSize),
TimeRange: &pb.TimeRange{},
},
TrackScanMetrics: proto.Bool(false),
Renew: proto.Bool(true),
}
}(),
},
Expand Down Expand Up @@ -1573,7 +1639,8 @@ func TestDeserializeCellBlocksScan(t *testing.T) {
}

func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []byte,
fam map[string][]string, fltr filter.Filter, numberOfRows uint32) bool {
fam map[string][]string, fltr filter.Filter, numberOfRows uint32,
renewInterval time.Duration, renewalScan bool) bool {
if fltr == nil && s.filter != nil {
return false
}
Expand All @@ -1582,7 +1649,9 @@ func confirmScanAttributes(ctx context.Context, s *Scan, table, start, stop []by
bytes.Equal(s.StartRow(), start) &&
bytes.Equal(s.StopRow(), stop) &&
reflect.DeepEqual(s.families, fam) &&
s.numberOfRows == numberOfRows
s.numberOfRows == numberOfRows &&
s.renewInterval == renewInterval &&
s.renewalScan == renewalScan
}

func BenchmarkMutateToProtoWithNestedMaps(b *testing.B) {
Expand Down
56 changes: 54 additions & 2 deletions hrpc/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"time"

"github.com/tsuna/gohbase/pb"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -77,6 +78,9 @@ type Scan struct {

closeScanner bool
allowPartialResults bool

renewInterval time.Duration
renewalScan bool
}

// baseScan returns a Scan struct with default values set.
Expand All @@ -93,6 +97,8 @@ func baseScan(ctx context.Context, table []byte,
maxResultSize: DefaultMaxResultSize,
numberOfRows: DefaultNumberOfRows,
reversed: false,
renewInterval: 0 * time.Second,
renewalScan: false,
}
err := applyOptions(s, options...)
if err != nil {
Expand All @@ -104,10 +110,12 @@ func baseScan(ctx context.Context, table []byte,
func (s *Scan) String() string {
return fmt.Sprintf("Scan{Table=%q StartRow=%q StopRow=%q TimeRange=(%d, %d) "+
"MaxVersions=%d NumberOfRows=%d MaxResultSize=%d Familes=%v Filter=%v "+
"StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v}",
"StoreLimit=%d StoreOffset=%d ScannerID=%d Close=%v RenewInterval=%v"+
"RenewalScan=%v}",
s.table, s.startRow, s.stopRow, s.fromTimestamp, s.toTimestamp,
s.maxVersions, s.numberOfRows, s.maxResultSize, s.families, s.filter,
s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner)
s.storeLimit, s.storeOffset, s.scannerID, s.closeScanner, s.renewInterval,
s.renewalScan)
}

// NewScan creates a scanner for the given table.
Expand Down Expand Up @@ -189,6 +197,18 @@ func (s *Scan) TrackScanMetrics() bool {
return s.trackScanMetrics
}

// RenewInterval returns the interval at which the scanner will be renewed
// which is usually lease timeout / 2 secs
func (s *Scan) RenewInterval() time.Duration {
return s.renewInterval
}

// RenewalScan returns whether this scan is to be used only a renewal request
// to hbase
func (s *Scan) RenewalScan() bool {
return s.renewalScan
}

// ToProto converts this Scan into a protobuf message
func (s *Scan) ToProto() proto.Message {
scan := &pb.ScanRequest{
Expand All @@ -201,6 +221,11 @@ func (s *Scan) ToProto() proto.Message {
// since we don't really time out our scans (unless context was cancelled)
ClientHandlesHeartbeats: proto.Bool(true),
TrackScanMetrics: &s.trackScanMetrics,
Renew: proto.Bool(false),
}
// Tells hbase whether this request is for scanner renewal
if s.renewalScan {
scan.Renew = &s.renewalScan
}
if s.scannerID != math.MaxUint64 {
scan.ScannerId = &s.scannerID
Expand Down Expand Up @@ -387,3 +412,30 @@ func Attribute(key string, val []byte) func(Call) error {
return nil
}
}

// RenewInterval is a an option for scan requests.
// Enables renewal of scanners at an interval to prevent timeout of scanners due to
// waiting/starvation
func RenewInterval(interval time.Duration) func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'RenewInterval' option can only be used with Scan queries")
}
scan.renewInterval = interval
return nil
}
}

// RenewalScan is an option for scan requests.
// Indicates that this Scan request will be used for the renewal of a scanner only
func RenewalScan() func(Call) error {
return func(g Call) error {
scan, ok := g.(*Scan)
if !ok {
return errors.New("'RenewScan' option can only be used with Scan queries")
}
scan.renewalScan = true
return nil
}
}
Loading

0 comments on commit 8df1eda

Please sign in to comment.