-
Notifications
You must be signed in to change notification settings - Fork 124
feat: logs exporter #685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: logs exporter #685
Conversation
Important Review skippedMore than 25% of the files skipped due to max files limit. The review is being skipped to prevent a low-quality review. 140 files out of 278 files are above the max files limit of 100. Please upgrade to Pro plan to get higher limits. You can disable this status message by setting the ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
54afbef
to
f5f8248
Compare
dc9d5af
to
4ef6b8c
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #685 +/- ##
==========================================
- Coverage 82.91% 80.52% -2.39%
==========================================
Files 145 186 +41
Lines 8240 10199 +1959
==========================================
+ Hits 6832 8213 +1381
- Misses 1082 1565 +483
- Partials 326 421 +95 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
065b173
to
995c122
Compare
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
ab7ceeb
to
d3be2d9
Compare
413465b
to
11fdf35
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Replication Drivers Fail to Handle JSON Marshalling Errors
The Accept
methods in the stdout
, http
, elasticsearch
, and clickhouse
replication drivers incorrectly handle JSON marshalling errors. If json.Marshal
or json.MarshalIndent
fails for a log, the method returns (nil, err)
instead of providing per-item errors as required by the drivers.Driver
interface contract.
internal/replication/drivers/stdout/driver.go#L29-L40
ledger/internal/replication/drivers/stdout/driver.go
Lines 29 to 40 in d3b55db
func (driver *Driver) Accept(_ context.Context, logs ...drivers.LogWithLedger) ([]error, error) { | |
for _, log := range logs { | |
data, err := json.MarshalIndent(log, "", " ") | |
if err != nil { | |
return nil, err | |
} | |
_, _ = fmt.Fprintln(driver.output, string(data)) | |
} | |
return make([]error, len(logs)), nil | |
} |
internal/replication/drivers/http/driver.go#L31-L54
ledger/internal/replication/drivers/http/driver.go
Lines 31 to 54 in d3b55db
func (c *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) { | |
buffer := bytes.NewBufferString("") | |
err := json.NewEncoder(buffer).Encode(logs) | |
if err != nil { | |
return nil, err | |
} | |
req, err := http.NewRequest(http.MethodPost, c.config.URL, buffer) | |
if err != nil { | |
return nil, err | |
} | |
req = req.WithContext(ctx) | |
rsp, err := c.httpClient.Do(req) | |
if err != nil { | |
return nil, err | |
} | |
if rsp.StatusCode < 200 || rsp.StatusCode > 299 { | |
return nil, fmt.Errorf("invalid status code, expect something between 200 and 299, got %d", rsp.StatusCode) | |
} | |
return make([]error, len(logs)), nil | |
} |
internal/replication/drivers/elasticsearch/driver.go#L45-L70
ledger/internal/replication/drivers/elasticsearch/driver.go
Lines 45 to 70 in d3b55db
func (driver *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) { | |
bulk := driver.client.Bulk().Refresh("true") | |
for _, log := range logs { | |
data, err := json.Marshal(log.Data) | |
if err != nil { | |
return nil, errors.Wrap(err, "marshalling data") | |
} | |
doc := struct { | |
ID string `json:"id"` | |
Payload json.RawMessage `json:"payload"` | |
Module string `json:"module"` | |
}{ | |
ID: DocID{ | |
Ledger: log.Ledger, | |
LogID: *log.ID, | |
}.String(), | |
Payload: json.RawMessage(data), | |
Module: log.Ledger, | |
} | |
bulk.Add( | |
elastic.NewBulkIndexRequest(). |
internal/replication/drivers/clickhouse/driver.go#L41-L70
ledger/internal/replication/drivers/clickhouse/driver.go
Lines 41 to 70 in d3b55db
func (c *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) { | |
batch, err := c.db.PrepareBatch(ctx, "insert into logs(ledger, id, type, date, data)") | |
if err != nil { | |
return nil, errors.Wrap(err, "failed to prepare batch") | |
} | |
for _, log := range logs { | |
data, err := json.Marshal(log.Data) | |
if err != nil { | |
return nil, errors.Wrap(err, "marshalling data") | |
} | |
if err := batch.Append( | |
log.Ledger, | |
*log.ID, | |
log.Type, | |
// if no timezone is specified, clickhouse assume the timezone is its local timezone | |
// since all our date are in UTC, we just need to pass +00:00 to clickhouse to inform it | |
// see https://clickhouse.com/docs/integrations/go#complex-types | |
log.Date.Format("2006-01-02 15:04:05.999999")+" +00:00", | |
string(data), | |
); err != nil { | |
return nil, errors.Wrap(err, "appending item to the batch") | |
} | |
} | |
return make([]error, len(logs)), errors.Wrap(batch.Send(), "failed to commit transaction") |
Was this report helpful? Give feedback by reacting with 👍 or 👎
d3b55db
to
715bd64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect Parameter Causes Identical User Actions
The generator.Next()
function call was incorrectly changed to pass the loop iteration count (iteration
) instead of the virtual user identifier (vu
). This semantic change causes all virtual users to generate identical action sequences, undermining the realism and effectiveness of load tests that require distinct user behaviors.
pkg/generate/set.go#L46-L47
Lines 46 to 47 in 8e150f3
action, err := generator.Next(iteration) |
Bug: Incorrect Error Logging Method Usage
The logging.Logger.Error()
method is incorrectly used with Printf
-style formatting arguments in internal/replication/manager.go
when starting pipelines. This method expects a single string argument; Errorf()
should be used for formatted logging.
internal/replication/manager.go#L370-L371
ledger/internal/replication/manager.go
Lines 370 to 371 in 8e150f3
if _, err := m.startPipeline(ctx, pipeline); err != nil { | |
logging.FromContext(ctx).Error("starting pipeline %s: %s", pipeline.ID, err) |
internal/replication/manager.go#L420-L421
ledger/internal/replication/manager.go
Lines 420 to 421 in 8e150f3
if _, err := m.startPipeline(ctx, *pipeline); err != nil { | |
logging.FromContext(ctx).Error("starting pipeline %s: %s", pipeline.ID, err) |
Bug: Batch Validation Error Message Mismatch
The Batching.Validate
method incorrectly returns an error message stating "flushBytes must be greater than 0" when MaxItems
is less than 0. The error message should refer to MaxItems
.
internal/replication/drivers/batcher.go#L170-L174
ledger/internal/replication/drivers/batcher.go
Lines 170 to 174 in 8e150f3
func (b *Batching) Validate() error { | |
if b.MaxItems < 0 { | |
return errors.New("flushBytes must be greater than 0") | |
} | |
Was this report helpful? Give feedback by reacting with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Exporter ID Shadowing Causes Pipeline Stop Failure
Variable shadowing in the DeleteExporter
method causes a bug where the loop variable id
shadows the method parameter id
(exporter ID). As a result, the comparison config.pipeline.ExporterID == id
incorrectly uses the pipeline ID instead of the intended exporter ID, preventing pipelines associated with the exporter from being stopped. Renaming the loop variable to pipelineID
would resolve this.
internal/replication/manager.go#L329-L340
ledger/internal/replication/manager.go
Lines 329 to 340 in 068b00e
defer m.mu.Unlock() | |
driver, ok := m.drivers[id] | |
if ok { | |
for id, config := range m.pipelines { | |
if config.pipeline.ExporterID == id { | |
if err := m.stopPipeline(ctx, id); err != nil { | |
return fmt.Errorf("stopping pipeline: %w", err) | |
} | |
} | |
} | |
Bug: Error Handling Fails with Nil Checks
The updated error handling condition response.HTTPMeta.Response.StatusCode == http.StatusBadRequest && response.V2BulkResponse.ErrorCode != nil
introduces two issues: a potential nil pointer dereference if response.V2BulkResponse
is nil, and incorrect error handling where 400 Bad Request
responses might be ignored if response.V2BulkResponse.ErrorCode
is nil.
pkg/generate/generator.go#L151-L152
ledger/pkg/generate/generator.go
Lines 151 to 152 in 068b00e
if response.HTTPMeta.Response.StatusCode == http.StatusBadRequest && response.V2BulkResponse.ErrorCode != nil { |
Was this report helpful? Give feedback by reacting with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Batcher Send Failure Causes Nil Pointer Panic
In the Accept
method, if b.batcher.Send()
fails for a log, the corresponding entry in the operations
slice remains nil. The subsequent loop then attempts to call operation.Wait()
on these nil operations, leading to a nil pointer panic.
internal/replication/drivers/batcher.go#L25-L43
ledger/internal/replication/drivers/batcher.go
Lines 25 to 43 in b50704a
func (b *Batcher) Accept(ctx context.Context, logs ...LogWithLedger) ([]error, error) { | |
itemsErrors := make([]error, len(logs)) | |
operations := make(batcher.Operations[LogWithLedger, error], len(logs)) | |
for ind, log := range logs { | |
ret, err := b.batcher.Send(ctx, log) | |
if err != nil { | |
itemsErrors[ind] = fmt.Errorf("failed to send log to the batcher: %w", err) | |
continue | |
} | |
operations[ind] = ret | |
} | |
for ind, operation := range operations { | |
if _, err := operation.Wait(ctx); err != nil { | |
itemsErrors[ind] = fmt.Errorf("failure while waiting for operation completion: %w", err) | |
continue | |
} | |
} |
Was this report helpful? Give feedback by reacting with 👍 or 👎
No description provided.