Skip to content

feat: logs exporter #685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jul 18, 2025
Merged

feat: logs exporter #685

merged 25 commits into from
Jul 18, 2025

Conversation

gfyrag
Copy link
Collaborator

@gfyrag gfyrag commented Feb 14, 2025

No description provided.

@gfyrag gfyrag requested a review from a team as a code owner February 14, 2025 16:00
@gfyrag gfyrag changed the base branch from main to feat/leadership February 14, 2025 16:01
Copy link

coderabbitai bot commented Feb 14, 2025

Important

Review skipped

More than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review.

140 files out of 278 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/data-ingestion

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@gfyrag gfyrag force-pushed the feat/data-ingestion branch 5 times, most recently from 54afbef to f5f8248 Compare February 19, 2025 17:15
@gfyrag gfyrag changed the base branch from feat/leadership to main February 19, 2025 17:15
@gfyrag gfyrag force-pushed the feat/data-ingestion branch 6 times, most recently from dc9d5af to 4ef6b8c Compare February 20, 2025 17:32
Copy link

codecov bot commented Feb 20, 2025

Codecov Report

Attention: Patch coverage is 72.50782% with 615 lines in your changes missing coverage. Please review.

Project coverage is 80.52%. Comparing base (4abf93b) to head (b50704a).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
internal/replication/manager.go 67.92% 81 Missing and 21 partials ⚠️
internal/replication/controller_grpc_server.go 23.63% 78 Missing and 6 partials ⚠️
internal/replication/controller_grpc_client.go 36.17% 54 Missing and 6 partials ⚠️
internal/replication/drivers/registry.go 62.00% 26 Missing and 12 partials ⚠️
internal/replication/drivers/batcher.go 69.42% 28 Missing and 9 partials ⚠️
internal/storage/system/store.go 72.35% 22 Missing and 12 partials ⚠️
internal/replication/drivers/clickhouse/driver.go 65.75% 17 Missing and 8 partials ⚠️
internal/replication/driver_facade.go 55.76% 22 Missing and 1 partial ⚠️
internal/replication/pipeline.go 76.53% 20 Missing and 3 partials ⚠️
internal/storage/ledger/errors.go 51.11% 22 Missing ⚠️
... and 33 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #685      +/-   ##
==========================================
- Coverage   82.91%   80.52%   -2.39%     
==========================================
  Files         145      186      +41     
  Lines        8240    10199    +1959     
==========================================
+ Hits         6832     8213    +1381     
- Misses       1082     1565     +483     
- Partials      326      421      +95     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gfyrag gfyrag force-pushed the feat/data-ingestion branch 3 times, most recently from 065b173 to 995c122 Compare February 24, 2025 14:23
Copy link

gitguardian bot commented Feb 24, 2025

️✅ There are no secrets present in this pull request anymore.

If these secrets were true positive and are still valid, we highly recommend you to revoke them.
While these secrets were previously flagged, we no longer have a reference to the
specific commits where they were detected. Once a secret has been leaked into a git
repository, you should consider it compromised, even if it was deleted immediately.
Find here more information about risks.


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@gfyrag gfyrag force-pushed the feat/data-ingestion branch 6 times, most recently from ab7ceeb to d3be2d9 Compare February 28, 2025 08:29
@gfyrag gfyrag force-pushed the feat/data-ingestion branch 3 times, most recently from 413465b to 11fdf35 Compare March 7, 2025 14:45
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Replication Drivers Fail to Handle JSON Marshalling Errors

The Accept methods in the stdout, http, elasticsearch, and clickhouse replication drivers incorrectly handle JSON marshalling errors. If json.Marshal or json.MarshalIndent fails for a log, the method returns (nil, err) instead of providing per-item errors as required by the drivers.Driver interface contract.

internal/replication/drivers/stdout/driver.go#L29-L40

func (driver *Driver) Accept(_ context.Context, logs ...drivers.LogWithLedger) ([]error, error) {
for _, log := range logs {
data, err := json.MarshalIndent(log, "", " ")
if err != nil {
return nil, err
}
_, _ = fmt.Fprintln(driver.output, string(data))
}
return make([]error, len(logs)), nil
}

internal/replication/drivers/http/driver.go#L31-L54

func (c *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) {
buffer := bytes.NewBufferString("")
err := json.NewEncoder(buffer).Encode(logs)
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, c.config.URL, buffer)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
rsp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if rsp.StatusCode < 200 || rsp.StatusCode > 299 {
return nil, fmt.Errorf("invalid status code, expect something between 200 and 299, got %d", rsp.StatusCode)
}
return make([]error, len(logs)), nil
}

internal/replication/drivers/elasticsearch/driver.go#L45-L70

func (driver *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) {
bulk := driver.client.Bulk().Refresh("true")
for _, log := range logs {
data, err := json.Marshal(log.Data)
if err != nil {
return nil, errors.Wrap(err, "marshalling data")
}
doc := struct {
ID string `json:"id"`
Payload json.RawMessage `json:"payload"`
Module string `json:"module"`
}{
ID: DocID{
Ledger: log.Ledger,
LogID: *log.ID,
}.String(),
Payload: json.RawMessage(data),
Module: log.Ledger,
}
bulk.Add(
elastic.NewBulkIndexRequest().

internal/replication/drivers/clickhouse/driver.go#L41-L70

func (c *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) {
batch, err := c.db.PrepareBatch(ctx, "insert into logs(ledger, id, type, date, data)")
if err != nil {
return nil, errors.Wrap(err, "failed to prepare batch")
}
for _, log := range logs {
data, err := json.Marshal(log.Data)
if err != nil {
return nil, errors.Wrap(err, "marshalling data")
}
if err := batch.Append(
log.Ledger,
*log.ID,
log.Type,
// if no timezone is specified, clickhouse assume the timezone is its local timezone
// since all our date are in UTC, we just need to pass +00:00 to clickhouse to inform it
// see https://clickhouse.com/docs/integrations/go#complex-types
log.Date.Format("2006-01-02 15:04:05.999999")+" +00:00",
string(data),
); err != nil {
return nil, errors.Wrap(err, "appending item to the batch")
}
}
return make([]error, len(logs)), errors.Wrap(batch.Send(), "failed to commit transaction")

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

@gfyrag gfyrag force-pushed the feat/data-ingestion branch from d3b55db to 715bd64 Compare July 17, 2025 11:17
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Parameter Causes Identical User Actions

The generator.Next() function call was incorrectly changed to pass the loop iteration count (iteration) instead of the virtual user identifier (vu). This semantic change causes all virtual users to generate identical action sequences, undermining the realism and effectiveness of load tests that require distinct user behaviors.

pkg/generate/set.go#L46-L47

action, err := generator.Next(iteration)

Fix in CursorFix in Web


Bug: Incorrect Error Logging Method Usage

The logging.Logger.Error() method is incorrectly used with Printf-style formatting arguments in internal/replication/manager.go when starting pipelines. This method expects a single string argument; Errorf() should be used for formatted logging.

internal/replication/manager.go#L370-L371

if _, err := m.startPipeline(ctx, pipeline); err != nil {
logging.FromContext(ctx).Error("starting pipeline %s: %s", pipeline.ID, err)

internal/replication/manager.go#L420-L421

if _, err := m.startPipeline(ctx, *pipeline); err != nil {
logging.FromContext(ctx).Error("starting pipeline %s: %s", pipeline.ID, err)

Fix in CursorFix in Web


Bug: Batch Validation Error Message Mismatch

The Batching.Validate method incorrectly returns an error message stating "flushBytes must be greater than 0" when MaxItems is less than 0. The error message should refer to MaxItems.

internal/replication/drivers/batcher.go#L170-L174

func (b *Batching) Validate() error {
if b.MaxItems < 0 {
return errors.New("flushBytes must be greater than 0")
}

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Exporter ID Shadowing Causes Pipeline Stop Failure

Variable shadowing in the DeleteExporter method causes a bug where the loop variable id shadows the method parameter id (exporter ID). As a result, the comparison config.pipeline.ExporterID == id incorrectly uses the pipeline ID instead of the intended exporter ID, preventing pipelines associated with the exporter from being stopped. Renaming the loop variable to pipelineID would resolve this.

internal/replication/manager.go#L329-L340

defer m.mu.Unlock()
driver, ok := m.drivers[id]
if ok {
for id, config := range m.pipelines {
if config.pipeline.ExporterID == id {
if err := m.stopPipeline(ctx, id); err != nil {
return fmt.Errorf("stopping pipeline: %w", err)
}
}
}

Fix in CursorFix in Web


Bug: Error Handling Fails with Nil Checks

The updated error handling condition response.HTTPMeta.Response.StatusCode == http.StatusBadRequest && response.V2BulkResponse.ErrorCode != nil introduces two issues: a potential nil pointer dereference if response.V2BulkResponse is nil, and incorrect error handling where 400 Bad Request responses might be ignored if response.V2BulkResponse.ErrorCode is nil.

pkg/generate/generator.go#L151-L152

if response.HTTPMeta.Response.StatusCode == http.StatusBadRequest && response.V2BulkResponse.ErrorCode != nil {

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Batcher Send Failure Causes Nil Pointer Panic

In the Accept method, if b.batcher.Send() fails for a log, the corresponding entry in the operations slice remains nil. The subsequent loop then attempts to call operation.Wait() on these nil operations, leading to a nil pointer panic.

internal/replication/drivers/batcher.go#L25-L43

func (b *Batcher) Accept(ctx context.Context, logs ...LogWithLedger) ([]error, error) {
itemsErrors := make([]error, len(logs))
operations := make(batcher.Operations[LogWithLedger, error], len(logs))
for ind, log := range logs {
ret, err := b.batcher.Send(ctx, log)
if err != nil {
itemsErrors[ind] = fmt.Errorf("failed to send log to the batcher: %w", err)
continue
}
operations[ind] = ret
}
for ind, operation := range operations {
if _, err := operation.Wait(ctx); err != nil {
itemsErrors[ind] = fmt.Errorf("failure while waiting for operation completion: %w", err)
continue
}
}

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

@gfyrag gfyrag added this pull request to the merge queue Jul 18, 2025
Merged via the queue into main with commit 5221873 Jul 18, 2025
8 of 11 checks passed
@gfyrag gfyrag deleted the feat/data-ingestion branch July 18, 2025 07:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants