Skip to content

Conversation

henzenvandijk
Copy link
Contributor

  • viewing of partition offsets, optionally filtered by topic and/or partition
  • viewing of consumer group offsets, optionally filtered by topic and/or partition
  • modification of consumer group offsets, optionally filtered by topic and/or partition

 - viewing of partition offsets, optionally filtered by topic and/or partition
 - viewing of consumer group offsets, optionally filtered by topic and/or partition
 - modification of consumer group offsets, optionally filtered by topic and/or partition
Copy link
Owner

@fgeller fgeller left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great @henzenvandijk!

i left some comments and will need to think about some of the duplication between topic & offset. have you considered what to do there?


The following syntax is supported for setConsumerOffsets:

(oldest|newest)?(\d+)?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably (oldest|newest|\d+) or?

} else if config.offset.args.set == "oldest" {
config.offset.setOldest = true
} else if newOffset, err := strconv.Atoi(config.offset.args.set); err == nil {
config.offset.set = int64(newOffset)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can simplify this by using sarama's constants to identify newest/oldest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this, but the sarama constants have int values of -1 and -2.

If you create a new consumer group its default offset for a topic is -1, so someone might try to set that value in order to reset a group offset.
This would conflict with the constant, because -1 equals the constant sarama.OffsetNewest (the opposite of what someone might be trying to achieve)

}

for _, t := range topics {
if config.offset.args.topic == "" || len(config.offset.topic.FindString(t)) > 0 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know about MatchString?

getBroker := client.Coordinator
getPartitions := client.Partitions
getOffset := client.GetOffset
getPartitionOffsetManager := om.ManagePartition
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, why are you assigning these?

}

for _, p := range partitions {
if config.offset.partition == -1 || config.offset.partition == p {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this way you're not getting any feedback if the selected partition doesn't exist, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct but the partition flag is a filter, so -partition=<non-existing partition number> returns 0 results.

It works the same for topics, when adding -topic=something-non-existing, then it will return 0 results

) {
po, err := getOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read offsets for [%s][%d] err=%v\n", topic, partition, err)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for the [..] i think, maybe Failed to read offsets topic=%#v partition=%v err=%v?

Topic string `json:"topic"`
Partition int32 `json:"partition"`
PartitionOffset int64 `json:"partition-offset"`
ConsumerOffset *int64 `json:"consumer-offset,omitempty"`
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's this a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it prints the offsets and I wanted it to be printed when the int was 0 but omitted when it was actually empty (no consumer group offsets, if we are displaying only offsets for partitions).

The pointer allowed me to omit the empty ConsumerOffset, while still allowing me to print a zero value int

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘


memberID = resp.MemberId
generationID = resp.GenerationId
return memberID, generationID
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're using named return params, then you don't need to explicitly return them. but i don't see a benefit to using the named ones here - most of the exit points are os.Exit's

}(out)

// print to console
go func() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this way you have a chance that messages won't be printed. how about blocking on this for (rather than spawning a goroutine) and quitting it when you close the out channel (and close the out channel where you currently call wg.Done)?

@fgeller fgeller merged commit 3ba48f9 into fgeller:master Nov 7, 2016
@fgeller
Copy link
Owner

fgeller commented Nov 7, 2016

thanks a lot @henzenvandijk

@henzenvandijk henzenvandijk deleted the kt-offset branch November 7, 2016 03:41
rogpeppe pushed a commit to rogpeppe-contrib/kt that referenced this pull request Mar 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants