-
Notifications
You must be signed in to change notification settings - Fork 103
kt offset #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kt offset #36
Conversation
henzenvandijk
commented
Nov 3, 2016
- viewing of partition offsets, optionally filtered by topic and/or partition
- viewing of consumer group offsets, optionally filtered by topic and/or partition
- modification of consumer group offsets, optionally filtered by topic and/or partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great @henzenvandijk!
i left some comments and will need to think about some of the duplication between topic & offset. have you considered what to do there?
|
||
The following syntax is supported for setConsumerOffsets: | ||
|
||
(oldest|newest)?(\d+)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably (oldest|newest|\d+)
or?
} else if config.offset.args.set == "oldest" { | ||
config.offset.setOldest = true | ||
} else if newOffset, err := strconv.Atoi(config.offset.args.set); err == nil { | ||
config.offset.set = int64(newOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you can simplify this by using sarama's constants to identify newest/oldest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at this, but the sarama constants have int values of -1 and -2.
If you create a new consumer group its default offset for a topic is -1, so someone might try to set that value in order to reset a group offset.
This would conflict with the constant, because -1 equals the constant sarama.OffsetNewest (the opposite of what someone might be trying to achieve)
} | ||
|
||
for _, t := range topics { | ||
if config.offset.args.topic == "" || len(config.offset.topic.FindString(t)) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know about MatchString
?
getBroker := client.Coordinator | ||
getPartitions := client.Partitions | ||
getOffset := client.GetOffset | ||
getPartitionOffsetManager := om.ManagePartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, why are you assigning these?
} | ||
|
||
for _, p := range partitions { | ||
if config.offset.partition == -1 || config.offset.partition == p { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this way you're not getting any feedback if the selected partition doesn't exist, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct but the partition flag is a filter, so -partition=<non-existing partition number>
returns 0 results.
It works the same for topics, when adding -topic=something-non-existing
, then it will return 0 results
) { | ||
po, err := getOffset(topic, partition, sarama.OffsetNewest) | ||
if err != nil { | ||
fmt.Fprintf(os.Stderr, "Failed to read offsets for [%s][%d] err=%v\n", topic, partition, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for the [..]
i think, maybe Failed to read offsets topic=%#v partition=%v err=%v
?
Topic string `json:"topic"` | ||
Partition int32 `json:"partition"` | ||
PartitionOffset int64 `json:"partition-offset"` | ||
ConsumerOffset *int64 `json:"consumer-offset,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why's this a pointer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it prints the offsets and I wanted it to be printed when the int was 0 but omitted when it was actually empty (no consumer group offsets, if we are displaying only offsets for partitions).
The pointer allowed me to omit the empty ConsumerOffset, while still allowing me to print a zero value int
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘
|
||
memberID = resp.MemberId | ||
generationID = resp.GenerationId | ||
return memberID, generationID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you're using named return params, then you don't need to explicitly return them. but i don't see a benefit to using the named ones here - most of the exit points are os.Exit
's
}(out) | ||
|
||
// print to console | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this way you have a chance that messages won't be printed. how about blocking on this for (rather than spawning a goroutine) and quitting it when you close the out
channel (and close the out channel where you currently call wg.Done
)?
thanks a lot @henzenvandijk |
Manual fetch from upstream