Skip to content

Commit

Permalink
lint fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sanket Sudake <[email protected]>
  • Loading branch information
sanketsudake committed Nov 23, 2023
1 parent 39e6c0e commit d774b7d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 22 deletions.
2 changes: 0 additions & 2 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ linters:
- govet
- ineffassign
- staticcheck
# - structcheck
- typecheck
- unused
# Additional linters
Expand All @@ -18,7 +17,6 @@ linters:
- dogsled
- dupl
# - gosec
# - ifshort
- nilerr
- prealloc
# - revive
Expand Down
30 changes: 15 additions & 15 deletions aws-kinesis-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type awsKinesisConnector struct {

// listShards get called every 30sec to get all the shards
func (conn *awsKinesisConnector) listShards() ([]*kinesis.Shard, error) {
//call DescribeStream to get updated shards
// call DescribeStream to get updated shards
stream, err := conn.client.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: &conn.connectordata.Topic,
})
Expand All @@ -56,14 +56,14 @@ func (conn *awsKinesisConnector) findNewShards() {
ticker.Stop()
return
case <-ticker.C:
//check if new shards are available in every 30 seconds
// check if new shards are available in every 30 seconds
shardList, err := conn.listShards()
if err != nil {
return
}

for _, s := range shardList {
//send only new shards
// send only new shards
_, loaded := shards.LoadOrStore(*s.ShardId, s)
if !loaded {
conn.shardc <- s
Expand All @@ -81,7 +81,7 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
}

if checkpoint != "" {
//Start from, where we left
// Start from, where we left
params.StartingSequenceNumber = aws.String(checkpoint)
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
Expand All @@ -90,7 +90,7 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
}
return iteratorOutput, err
}
//Start from, oldest record in the shard
// Start from, oldest record in the shard
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
if err != nil {
Expand All @@ -114,27 +114,27 @@ func (conn *awsKinesisConnector) getRecords(shardIterator *string) (*kinesis.Get

// Check if shards are closed, shards can be updated by using update-shard-count method
func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
//No new iterator is present, means it is closed
// No new iterator is present, means it is closed
return nextShardIterator == nil || currentShardIterator == nextShardIterator
}

// scan each shards for any new records, when found call the passed func
func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
//checkpoints to identify how much read has happened
// checkpoints to identify how much read has happened
var checkpoints sync.Map
var wg sync.WaitGroup
//get called when any new shards are added
// get called when any new shards are added
for s := range conn.shardc {
//Start fresh
// Start fresh
checkpoints.Store(*s.ShardId, "")
wg.Add(1)
go func(shardID string) {
defer wg.Done()
//scan every 10 second
// scan every 10 second
scanTicker := time.NewTicker(10 * time.Second)
defer scanTicker.Stop()
for {
//do noting if shard got deleted
// do noting if shard got deleted
checkpoint, found := checkpoints.Load(shardID)
if !found {
conn.logger.Info("shard not found", zap.String("shardID", shardID))
Expand All @@ -158,7 +158,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
}

for _, r := range resp.Records {
//send records
// send records
err := fn(&record{r, shardID, resp.MillisBehindLatest})
checkpoints.Store(shardID, *r.SequenceNumber)
if err != nil {
Expand All @@ -168,7 +168,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
}
}
if isShardClosed(resp.NextShardIterator, iterator) {
//when shards got deleted, remove it from checkpoints
// when shards got deleted, remove it from checkpoints
if _, found := checkpoints.Load(shardID); found {
checkpoints.Delete(shardID)
return
Expand Down Expand Up @@ -315,10 +315,10 @@ func main() {
connectordata: connectordata,
logger: logger,
shardc: shardc,
maxRecords: 10, //Read maximum 10 records
maxRecords: 10, // Read maximum 10 records
}
logger.Info("Starting aws kinesis connector")
//Get the shards in shardc chan
// Get the shards in shardc chan
go func() {
conn.findNewShards()
cancel()
Expand Down
4 changes: 2 additions & 2 deletions aws-sqs-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseURL(baseURL *url.URL, queueName string) (string, error) {

func (conn awsSQSConnector) consumeMessage() {
var maxNumberOfMessages = int64(10) // Process maximum 10 messages concurrently
var waitTimeSeconds = int64(5) //Wait 5 sec to process another message
var waitTimeSeconds = int64(5) // Wait 5 sec to process another message
var respQueueURL, errorQueueURL string
headers := http.Header{
"KEDA-Topic": {conn.connectordata.Topic},
Expand Down Expand Up @@ -90,7 +90,7 @@ func (conn awsSQSConnector) consumeMessage() {
if err != nil {
conn.errorHandler(errorQueueURL, err)
} else {
//Generating SQS Message attribute
// Generating SQS Message attribute
var sqsMessageAttValue = make(map[string]*sqs.MessageAttributeValue)
for k, v := range resp.Header {
for _, d := range v {
Expand Down
5 changes: 2 additions & 3 deletions kafka-http-connector/scram_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ package main
import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
var SHA256 scram.HashGeneratorFcn = sha256.New
var SHA512 scram.HashGeneratorFcn = sha512.New

type XDGSCRAMClient struct {
*scram.Client
Expand Down

0 comments on commit d774b7d

Please sign in to comment.