Skip to content

Conversation

leite08
Copy link
Member

@leite08 leite08 commented Aug 28, 2025

Dependencies

Description

  • Pass output prefix as param to FhirToCsvTransform
  • Use core's impl to put messages into SQS

Testing

...WIP

  • Local
    • [Indicate how you tested this, on local or staging]
    • ...
  • Staging
    • testing step 1
    • testing step 2
  • Sandbox
    • testing step 1
    • testing step 2
  • Production
    • testing step 1
    • testing step 2

Release Plan

[How does the changes on this PR impact/interact with the existing environment (database, configs, secrets, FFs, api contracts, etc.)?
Consider creating 2+ PRs if we need to ship those changes in a staged way]

[This is the release plan for production]

[You should execute the exact same steps when releasing to staging to validate it works]

[Add and remove items below accordingly]

  • ⚠️ This contains a DB migration
  • Maintenance window scheduled/created at Checkly (if needed)
  • Execute this on ,
    • step1
    • step2
  • Added to monthly product update (deprecates a feature that needs to be communicated with customers)
  • Added to monthly product update (introduce a feature that would be useful customers)
  • Upstream dependencies are met/released
  • Release NPM packages
  • Fern Definition Updated
  • Release Fern SDKs
  • FFs have been set in Staging, Production, and Sandbox
  • Happy-path E2E test created checking new FF flow
  • No dependencies between API and Infra that will cause errors during deployment
  • FHIR Integration Test ran with validation
  • [action n-1]
  • [action n]
  • Merge this

Summary by CodeRabbit

  • New Features

    • Bulk FHIR-to-CSV processing for faster, scalable batch runs.
    • Feature flag to enable incremental ingestion per customer.
    • Consolidated bundles now include ADT-sourced resources.
  • Improvements

    • Improved deduplication and per-patient processing for reliability.
    • Standardized output prefixes/locations for CSV exports.
  • Refactor

    • Migrated from single-run to bulk/prefix-based flows.
  • Infrastructure

    • Split pipeline into Transform and Bulk Lambdas with a dedicated queue and updated environment config.

Copy link

linear bot commented Aug 28, 2025

Copy link

coderabbitai bot commented Aug 28, 2025

Walkthrough

Replaces single-run FHIR→CSV with a bulk/prefix-based flow: adds bulk request/handler types and factory, renames filename helpers to bulk variants, updates lambdas/config/infra to split bulk vs transform, introduces feature-flagged incremental-ingestion stub, and updates data-transformation and API route to use outputPrefix.

Changes

Cohort / File(s) Summary of changes
API route switched to bulk
packages/api/src/routes/internal/analytics-platform/index.ts
Route now uses bulk handler import (buildFhirToCsvBulkHandler), requires outputPrefix (built via buildFhirToCsvBulkJobPrefix), removes inputBundle handling, adds requestLogger middleware and TODO note.
Core bulk handler modules
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts, .../fhir-to-csv-bulk-cloud.ts, .../fhir-to-csv-bulk-direct.ts, .../fhir-to-csv-bulk-factory.ts, .../fhir-to-csv-transform.ts
Adds ProcessFhirToCsvBulkRequest and FhirToCsvBulkHandler; implements bulk cloud/direct handlers and factory; cloud handler uses bulk FIFO queue URL, deduplicationId=patientId; transform caller now requires outputPrefix and always sends OUTPUT_PREFIX.
Legacy non-bulk removal
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv.ts, .../fhir-to-csv-factory.ts
Removes old ProcessFhirToCsvRequest, FhirToCsvHandler, and the legacy factory.
Filename helpers → bulk variants
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts, packages/core/src/command/analytics-platform/merge-csvs/index.ts, packages/utils/src/analytics-platform/2-merge-csvs.ts
Renames/introduces bulk prefix builders/parsers (base/job/patient prefixes, parse helpers) and updates imports/usages to bulk variants.
Utils: trigger switched to bulk handler
packages/utils/src/analytics-platform/1-fhir-to-csv.ts
Replaces per-patient SQS send with local FhirToCsvBulkCloud instance call processFhirToCsv({ jobId, cxId, patientId, outputPrefix }); removes direct SQS client usage and queue env var reads.
Data-transformation (Python) bulk inputs
packages/data-transformation/fhir-to-csv/main.py, .../src/utils/file.py
transform_and_upload_data now requires output_file_prefix (removes input_bundle and prefix builder util); handler reads OUTPUT_PREFIX; S3 keys built from file basenames.
Lambdas schema & entry
packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts, packages/lambdas/src/shared/analytics-platform.ts
Bulk lambda entry now uses FhirToCsvBulkDirect; shared zod schema adds required outputPrefix.
Infra: split bulk vs transform, assets & env
packages/infra/lib/analytics-platform/analytics-platform-stack.ts, .../types.ts, packages/infra/lib/api-stack.ts, packages/infra/lib/api-stack/api-service.ts
Introduces separate Bulk and Transform lambdas and a dedicated FIFO queue; renames exported assets to fhirToCsvBulk*; updates settings/types to expose fhirToCsvBulk; propagates FHIR_TO_CSV_BULK_QUEUE_URL env and grants send permissions to bulk queue.
Consolidation + incremental-ingestion stub
packages/core/src/command/consolidated/consolidated-create.ts, packages/core/src/command/analytics-platform/incremental-ingestion.ts
Adds ADT-sourced resources to consolidated bundle; after storing bundle invokes feature-flag-gated ingestPatientIntoAnalyticsPlatform stub (logs and early-exits unless enabled).
Feature-flag additions
packages/core/src/command/feature-flags/domain-ffs.ts, .../ffs-on-dynamodb.ts, .../types.ts
Adds analyticsIncrementalIngestion to schema/defaults and provides getCxsEnabledForAnalyticsIncrementalIngestion() and isAnalyticsIncrementalIngestionEnabledForCx(cxId) helpers.
Config accessor rename
packages/core/src/util/config.ts
Renames getFhirToCsvQueueurl("")getFhirToCsvBulkQueueurl("") and reads FHIR_TO_CSV_BULK_QUEUE_URL.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant API as API Route
  participant Utils as Utils / Caller
  participant Core as Core Bulk Handler
  participant SQS as SQS FIFO Queue
  participant BulkLambda as Lambda: FHIR-to-CSV Bulk
  participant Transform as Lambda: FHIR-to-CSV Transform
  participant S3 as S3 (analytics platform)

  API->>Utils: HTTP -> trigger job (cxId, jobId, patientId)
  Utils->>Core: processFhirToCsv({ cxId, jobId, patientId, outputPrefix })
  Core->>SQS: SendMessage(JSON payload) (GroupId=patientId, DedupId=patientId)
  SQS-->>BulkLambda: Invoke with message payload
  BulkLambda->>Transform: Invoke Transform Lambda with { cxId, jobId, patientId, OUTPUT_PREFIX }
  Transform->>S3: Upload CSVs under OUTPUT_PREFIX
  Transform-->>BulkLambda: Done
  BulkLambda-->>SQS: Ack / delete message
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch eng-743-fhir-inferno-lambda-more-flexible

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@leite08 leite08 changed the base branch from develop to eng-743-bulk-ingest August 28, 2025 16:28
@leite08 leite08 mentioned this pull request Aug 28, 2025
8 tasks
@leite08 leite08 changed the title ENG-xxx ...WIP ENG-743 ...WIP Aug 28, 2025
@leite08 leite08 force-pushed the eng-743-fhir-inferno-lambda-more-flexible branch 2 times, most recently from 068699a to a1a395e Compare September 1, 2025 18:35
Base automatically changed from eng-743-bulk-ingest to develop September 2, 2025 18:14
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
packages/data-transformation/fhir-to-csv/main.py (2)

47-52: Fix exception type and 404 handling for S3 download_file

boto3 raises botocore.exceptions.ClientError; checking only s3_client.exceptions.ClientError will miss it.

Apply:

@@
-import snowflake.connector
+import snowflake.connector
+from botocore.exceptions import ClientError
@@
-        except s3_client.exceptions.ClientError as e:
-            if e.response['Error']['Code'] == '404':
+        except ClientError as e:
+            code = e.response.get('Error', {}).get('Code')
+            if code in ('404', 'NoSuchKey'):
                 print(f"Bundle {bundle_key} not found in input bucket {input_bucket}")
                 raise ValueError("Bundle not found") from e
             else:
                 raise e

121-124: Avoid sys.exit in Lambda handler; return instead

Returning is the safe/idiomatic way to end execution in AWS Lambda.

Apply:

-    if len(output_bucket_and_file_keys_and_table_names) < 1:
-        print("No files were uploaded")
-        exit(0)
+    if len(output_bucket_and_file_keys_and_table_names) < 1:
+        print("No files were uploaded")
+        return []
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)

31-38: Make table-name parsing robust to both old and new file-name schemes

Handle “tmp_fhir-to-csv_output_cx_patient

.csv” and plain “
.csv”.

Apply:

 export function parseTableNameFromFhirToCsvBulkFileKey(key: string): string {
-  // e.g.: snowflake/fhir-to-csv/cx=cx-id/f2c=2025-08-08T02-18-56/pt=patient-id/_tmp_fhir-to-csv_output_cx-id_patient-id_condition.csv
-  const tableName = key.split("/").pop()?.split("_").slice(6).join("_").split(".")[0];
-  if (!tableName) {
+  // Supports:
+  // 1) .../_tmp_fhir-to-csv_output_<cx>_<patient>_<table>.csv
+  // 2) .../<table>.csv
+  const fileName = key.split("/").pop();
+  if (!fileName) {
+    throw new MetriportError(`Failed to parse tableName from fhirToCsvFileKey`, undefined, { key });
+  }
+  const base = fileName.replace(/\.csv$/i, "");
+  const legacyPrefix = "_tmp_fhir-to-csv_output_";
+  const tableName = base.startsWith(legacyPrefix)
+    ? base.split("_").slice(6).join("_")
+    : base;
+  if (!tableName || tableName.length < 1) {
     throw new MetriportError(`Failed to parse tableName from fhirToCsvFileKey`, undefined, { key });
   }
   return tableName;
 }
🧹 Nitpick comments (25)
packages/lambdas/src/shared/analytics-platform.ts (2)

3-9: Tighten validation: make schema strict, normalize null→undefined for inputBundle, and constrain outputPrefix.

This reduces footguns (unknown keys, empty/whitespace prefixes, pathy values) and aligns with our “no null inside the app” guideline.

Add helper schema near the imports:

const outputPrefixSchema = z
  .string()
  .min(1, "outputPrefix cannot be empty")
  .refine(s => s === s.trim(), "outputPrefix must not have leading/trailing whitespace")
  .refine(s => !s.startsWith("/"), 'outputPrefix must not start with "/"')
  .refine(s => !s.includes(".."), 'outputPrefix must not contain ".."')
  .refine(s => !s.includes("\\"), "outputPrefix must not contain backslashes");

Apply to the object:

 export const fhirToCsvSchema = z.object({
   cxId: z.string(),
   jobId: z.string(),
   patientId: z.string(),
-  inputBundle: z.string().optional(),
-  outputPrefix: z.string(),
-});
+  inputBundle: z.preprocess(v => (v === null ? undefined : v), z.string().optional()),
+  outputPrefix: outputPrefixSchema,
+}).strict();

3-9: Export the inferred type for consistency across services.

Keeps callers in sync with schema changes.

export type FhirToCsvMessage = z.infer<typeof fhirToCsvSchema>;
packages/core/src/command/feature-flags/ffs-on-dynamodb.ts (1)

223-227: Back-compat: merge persisted flags with defaults to auto-fill new keys

If an older DDB record lacks analyticsIncrementalIngestion, callers will see undefined for that key. Merge persisted values with initialFeatureFlags on read to guarantee presence of new keys without changing write/update flows.

Apply this diff inside ddbItemToDbRecord:

-  const featureFlags = featureFlagsRaw
-    ? typeof featureFlagsRaw === "string"
-      ? ffDatastoreSchema.parse(JSON.parse(featureFlagsRaw))
-      : featureFlagsRaw
-    : getInitialFeatureFlags();
+  const parsed =
+    featureFlagsRaw && typeof featureFlagsRaw === "string"
+      ? JSON.parse(featureFlagsRaw)
+      : featureFlagsRaw;
+  const featureFlags = parsed ? { ...initialFeatureFlags, ...parsed } : getInitialFeatureFlags();
packages/core/src/command/feature-flags/domain-ffs.ts (1)

331-334: Rename copy-paste var for clarity

Use a name reflecting the feature (not CommonWell V2).

-export async function isAnalyticsIncrementalIngestionEnabledForCx(cxId: string): Promise<boolean> {
-  const cxIdsWithCommonwellV2Enabled = await getCxsEnabledForAnalyticsIncrementalIngestion();
-  return cxIdsWithCommonwellV2Enabled.some(i => i === cxId);
-}
+export async function isAnalyticsIncrementalIngestionEnabledForCx(cxId: string): Promise<boolean> {
+  const cxIdsWithAnalyticsEnabled = await getCxsEnabledForAnalyticsIncrementalIngestion();
+  return cxIdsWithAnalyticsEnabled.some(i => i === cxId);
+}
packages/core/src/command/consolidated/consolidated-create.ts (1)

88-104: Update log to include ADT-sourced resource count

The message omits adtSourcedResources though they’re added to the bundle.

   log(
     `Added ${docRefsWithUpdatedMeta.length} docRefs, ` +
       `${pharmacyResources.length} pharmacy resources, ` +
-      `${labResources.length} lab resources, ` +
+      `${labResources.length} lab resources, ` +
+      `${adtSourcedResources.length} ADT-sourced resources, ` +
       `and the Patient, to a total of ${bundle.entry.length} entries`
   );
packages/core/src/util/config.ts (1)

394-396: De-risk env var rename with a temporary fallback

Add a fallback to FHIR_TO_CSV_QUEUE_URL and remove it post-rollout:

   static getFhirToCsvBulkQueueurl(""): string {
-    return getEnvVarOrFail("FHIR_TO_CSV_BULK_QUEUE_URL");
+    // TODO: remove fallback to FHIR_TO_CSV_QUEUE_URL after rollout
+    const v =
+      getEnvVar("FHIR_TO_CSV_BULK_QUEUE_URL") ?? getEnvVar("FHIR_TO_CSV_QUEUE_URL");
+    if (!v || v.trim().length < 1) {
+      throw new Error("Missing FHIR_TO_CSV_BULK_QUEUE_URL env var");
+    }
+    return v;
   }

No occurrences of FHIR_TO_CSV_QUEUE_URL or getFhirToCsvQueueurl("") remain in the repo, so this safely backfills legacy environments.

packages/data-transformation/fhir-to-csv/main.py (2)

96-96: Comment is outdated

It no longer appends “/<cx_id><patient_id><table_name>.csv” here.

Apply:

-    output_file_prefix = os.getenv("OUTPUT_PREFIX") # Will append '/<cx_id>_<patient_id>_<table_name>.csv' to it
+    # Files will be uploaded under OUTPUT_PREFIX as '<table>.csv'
+    output_file_prefix = os.getenv("OUTPUT_PREFIX")

69-72: Minor: use os.path.basename; extra replace is redundant

Small cleanup; improves readability.

Apply:

-            file_name = file.split("/")[-1].replace("/", "_")
+            file_name = os.path.basename(file)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (2)

3-4: Track TODO with ticket link and narrow scope

Consider clarifying whether “incremental ingestion” will require different prefix tokens to avoid collisions.


5-7: Extract base path to a constant to avoid magic strings

Keeps path consistent and easier to change.

Apply:

-import { MetriportError } from "@metriport/shared";
+import { MetriportError } from "@metriport/shared";
+
+const SNOWFLAKE_FHIR_TO_CSV_BASE = "snowflake/fhir-to-csv";
@@
 export function buildFhirToCsvBulkBasePrefix(cxId: string): string {
-  return `snowflake/fhir-to-csv/cx=${cxId}`;
+  return `${SNOWFLAKE_FHIR_TO_CSV_BASE}/cx=${cxId}`;
 }
packages/core/src/command/analytics-platform/merge-csvs/index.ts (1)

221-229: Harden parsing to skip unexpected keys instead of failing the whole run

If a stray object doesn’t match the expected Bulk file pattern, parseTableNameFromFhirToCsvBulkFileKey() will throw and stop the job. Consider skipping with a log to improve resilience.

Apply this diff within listAllFiles():

-  files.push(
-    ...rawFileList.flatMap(obj => {
-      if (!obj.Key) return [];
-      const size = obj.Size ?? 0;
-      if (size < 1) return [];
-      const key = obj.Key;
-      if (key.endsWith("/")) return [];
-      const tableName = parseTableNameFromFhirToCsvBulkFileKey(key);
-      return { key, size, tableName };
-    })
-  );
+  files.push(
+    ...rawFileList.flatMap(obj => {
+      if (!obj.Key) return [];
+      const size = obj.Size ?? 0;
+      if (size < 1) return [];
+      const key = obj.Key;
+      if (key.endsWith("/")) return [];
+      try {
+        const tableName = parseTableNameFromFhirToCsvBulkFileKey(key);
+        return { key, size, tableName };
+      } catch {
+        log(`Skipping unrecognized key: ${key}`);
+        return [];
+      }
+    })
+  );
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts (1)

6-11: Env-based handler selection is fine; consider an override for testing

Current dev→Direct, non-dev→Cloud split is clear. Optionally add an env override to force a handler during staging/local testing.

 export function buildFhirToCsvBulkHandler(): FhirToCsvBulkHandler {
-  if (Config.isDev()) {
+  const force = process.env.FHIR_TO_CSV_FORCE_HANDLER;
+  if (force === "direct") return new FhirToCsvBulkDirect();
+  if (force === "cloud") return new FhirToCsvBulkCloud();
+  if (Config.isDev()) {
     return new FhirToCsvBulkDirect();
   }
   return new FhirToCsvBulkCloud();
 }
packages/utils/src/analytics-platform/2-merge-csvs.ts (1)

166-173: Guard against non-patient prefixes to avoid accidental throws

Optional: filter prefixes to those containing '/pt=' before parsing to make the script more forgiving.

-  const patientIds = files.flatMap(file =>
-    file.Prefix ? parsePatientIdFromFhirToCsvBulkPatientPrefix(file.Prefix) : []
-  );
+  const patientIds = files.flatMap(file => {
+    const p = file.Prefix ?? "";
+    if (!p.includes("/pt=")) return [];
+    return [parsePatientIdFromFhirToCsvBulkPatientPrefix(p)];
+  });
packages/api/src/routes/internal/analytics-platform/index.ts (2)

11-11: Track the TODO for incremental ingestion

Please link or create the tracking ticket and reference it here to avoid stale TODOs. I can open it if you want.


28-31: Return 202 Accepted and echo identifiers for async work

This endpoint enqueues async processing; 202 better reflects that and returning identifiers aids tracing.

Apply:

-    await handler.processFhirToCsv({ cxId, jobId, patientId, outputPrefix });
-    return res.sendStatus(httpStatus.OK);
+    await handler.processFhirToCsv({ cxId, jobId, patientId, outputPrefix });
+    return res
+      .status(httpStatus.ACCEPTED)
+      .json({ cxId, jobId, patientId, outputPrefix });
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (2)

1-7: Make the request type immutable and remove redundant undefined

Optional properties are already unioned with undefined; also mark the object Readonly to prevent accidental mutation downstream.

Apply:

-export type ProcessFhirToCsvBulkRequest = {
+/** Contract for bulk FHIR-to-CSV invocation */
+export type ProcessFhirToCsvBulkRequest = Readonly<{
   jobId: string;
   cxId: string;
   patientId: string;
   outputPrefix: string;
-  timeoutInMillis?: number | undefined;
-};
+  timeoutInMillis?: number;
+}>;

9-11: Add a brief interface doc for clarity

Documenting the single method clarifies expected side effects (enqueue vs. direct).

Apply:

-export interface FhirToCsvBulkHandler {
+/** Handler abstraction for bulk FHIR-to-CSV (cloud/direct implementations). */
+export interface FhirToCsvBulkHandler {
   processFhirToCsv(request: ProcessFhirToCsvBulkRequest): Promise<void>;
 }
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1)

8-23: Add a lightweight runtime guard for outputPrefix.

Type checks won’t prevent an empty string at runtime. Fail fast to avoid writing to an unintended S3 location.

   async processFhirToCsv({
     cxId,
     jobId,
     patientId,
     outputPrefix,
     timeoutInMillis,
   }: ProcessFhirToCsvBulkRequest): Promise<void> {
+    if (!outputPrefix?.trim()) {
+      throw new Error("outputPrefix must be a non-empty string");
+    }
     await startFhirToCsvTransform({
       cxId,
       jobId,
       patientId,
       outputPrefix,
       timeoutInMillis,
     });
     if (this.waitTimeInMillis > 0) await sleep(this.waitTimeInMillis);
   }
packages/utils/src/analytics-platform/1-fhir-to-csv.ts (3)

116-117: Instantiate once (good); consider hoisting the computed outputPrefix.

Tiny micro-opt/readability: compute the constant once and reuse in the loop.

Example (outside the loop):

const outputPrefix = buildFhirToCsvBulkJobPrefix({ cxId, jobId: fhirToCsvJobId });

…and then pass just outputPrefix in processFhirToCsv.


147-158: Clarify success vs total in the final log line.

If there are failures, “ALL sent to queue” can read as misleading. Consider adjusting the message to “Sent X/Y patients to queue”.


162-185: Unify logging helper usage in catch branch.

This function uses console.log while the rest uses out(). Consider using the same logger for consistency.

packages/infra/lib/analytics-platform/analytics-platform-stack.ts (4)

31-45: Timeout/visibility alignment looks good; consider enabling SQS long polling

Adding receiveMessageWaitTime reduces empty receives and cost. Also, maxReceiveCount: 1 is strict—confirm this is intentional.

Apply:

       maxReceiveCount: 1,
+      receiveMessageWaitTime: Duration.seconds(20),
       visibilityTimeout: Duration.seconds(fhirToCsvBulkLambdaTimeout.toSeconds() * 2 + 1),

171-185: Remove unused args when calling setupFhirToCsvBulkLambda

analyticsPlatformBucket and medicalDocumentsBucket aren’t used by the bulk setup.

Apply:

-        analyticsPlatformBucket,
-        medicalDocumentsBucket: props.medicalDocumentsBucket,
         fhirToCsvTransformLambda,

299-315: Avoid shadowing the imported lambda module by renaming the local variable

Shadowing hurts readability and can confuse type references.

Apply:

-    const lambda = createLambda({
+    const bulkLambda = createLambda({
       ...lambdaSettings,
       stack: this,
       name,
       entry,
       envType,
       envVars: {
         // API_URL set on the api-stack after the OSS API is created
         WAIT_TIME_IN_MILLIS: waitTime.toMilliseconds().toString(),
         FHIR_TO_CSV_TRANSFORM_LAMBDA_NAME: fhirToCsvTransformLambda.functionName,
         ...(sentryDsn ? { SENTRY_DSN: sentryDsn } : {}),
       },
       layers: [lambdaLayers.shared],
       vpc,
       alarmSnsAction: alarmAction,
     });

316-320: Follow-up to rename: update references and return value

Consistent with the variable rename above.

Apply:

-    lambda.addEventSource(new SqsEventSource(queue, eventSourceSettings));
-    fhirToCsvTransformLambda.grantInvoke(lambda);
-
-    return { lambda, queue };
+    bulkLambda.addEventSource(new SqsEventSource(queue, eventSourceSettings));
+    fhirToCsvTransformLambda.grantInvoke(bulkLambda);
+
+    return { lambda: bulkLambda, queue };
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled
  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3732735 and 1c1b6d9.

📒 Files selected for processing (26)
  • packages/api/src/routes/internal/analytics-platform/index.ts (2 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts (2 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-factory.ts (0 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv.ts (0 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (3 hunks)
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts (1 hunks)
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts (3 hunks)
  • packages/core/src/command/consolidated/consolidated-create.ts (3 hunks)
  • packages/core/src/command/feature-flags/domain-ffs.ts (1 hunks)
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts (1 hunks)
  • packages/core/src/command/feature-flags/types.ts (1 hunks)
  • packages/core/src/util/config.ts (1 hunks)
  • packages/data-transformation/fhir-to-csv/main.py (5 hunks)
  • packages/data-transformation/fhir-to-csv/src/utils/file.py (0 hunks)
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts (10 hunks)
  • packages/infra/lib/analytics-platform/types.ts (1 hunks)
  • packages/infra/lib/api-stack.ts (1 hunks)
  • packages/infra/lib/api-stack/api-service.ts (2 hunks)
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts (2 hunks)
  • packages/lambdas/src/shared/analytics-platform.ts (1 hunks)
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts (3 hunks)
  • packages/utils/src/analytics-platform/2-merge-csvs.ts (2 hunks)
💤 Files with no reviewable changes (3)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv.ts
  • packages/data-transformation/fhir-to-csv/src/utils/file.py
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-factory.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{js,jsx,ts,tsx}

📄 CodeRabbit inference engine (.cursorrules)

**/*.{js,jsx,ts,tsx}: Don’t use null inside the app, only on code interacting with external interfaces/services, like DB and HTTP; convert to undefined before sending inwards into the code
Use const whenever possible
Use async/await instead of .then()
Naming: classes, enums: PascalCase
Naming: constants, variables, functions: camelCase
Naming: file names: kebab-case
Naming: Don’t use negative names, like notEnabled, prefer isDisabled
If possible, use decomposing objects for function parameters
Prefer Nullish Coalesce (??) than the OR operator (||) when you want to provide a default value
Avoid creating arrow functions
Use truthy syntax instead of in - i.e., if (data.link) not if ('link' in data)
While handling errors, keep the stack trace around: if you create a new Error (e.g., MetriportError), make sure to pass the original error as the new one’s cause so the stack trace is available upstream.
max column length is 100 chars
multi-line comments use /** */
top-level comments go after the import (save pre-import to basic file header, like license)
move literals to constants declared after imports when possible

Files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts
  • packages/lambdas/src/shared/analytics-platform.ts
  • packages/infra/lib/api-stack.ts
  • packages/core/src/util/config.ts
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts
  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/infra/lib/api-stack/api-service.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
  • packages/infra/lib/analytics-platform/types.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursorrules)

Use types whenever possible

**/*.{ts,tsx}: Prefer functional programming style functions: small, deterministic, single input and output
Avoid modifying objects received as parameters
Naming: classes and enums use PascalCase
Naming: constants, variables, and functions use camelCase
File names use kebab-case
Avoid negative names (e.g., prefer isDisabled to notEnabled)
Include units in numeric variable names when the type doesn’t convey the unit
TypeScript: use explicit types
TypeScript: prefer const over let
TypeScript: avoid any and casting from any to other types
TypeScript: use type predicates only for narrowing, not for full type coercion
TypeScript: prefer destructuring function parameters instead of multiple same-typed parameters
TypeScript: don’t use null inside the app; only at external boundaries and convert to undefined internally
TypeScript: use async/await instead of .then()
TypeScript: use strict equality ===; do not use ==
When calling a Promise-returning function without awaiting, chain .catch() to handle errors
Date/Time: always use buildDayjs() to create dayjs instances
Date/Time: prefer dayjs.duration(...) and keep values as duration
Prefer nullish coalescing (??) over OR (||) for default values
Avoid creating arrow functions
Use truthy property checks instead of the in operator (e.g., if (data.link))
Error handling: pass the original error as cause when wrapping errors
Error handling: use static error messages; put dynamic data in MetriportError.additionalInfo; error object last parameter
Global constants: move literals to constants after imports (avoid magic numbers)
Avoid multi-line logs: do not pass objects as the second parameter to console.log() or out().log()
Avoid multi-line logs: do not create multi-line strings with JSON.stringify()
Max column length is 100 characters
Use multi-line comments with /** */

Files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts
  • packages/lambdas/src/shared/analytics-platform.ts
  • packages/infra/lib/api-stack.ts
  • packages/core/src/util/config.ts
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts
  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/infra/lib/api-stack/api-service.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
  • packages/infra/lib/analytics-platform/types.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
🧠 Learnings (29)
📓 Common learnings
Learnt from: leite08
PR: metriport/metriport#4469
File: packages/fhir-converter/src/lib/workers/worker.js:75-87
Timestamp: 2025-08-26T02:33:26.777Z
Learning: User leite08 confirmed that the debug logging approach in packages/fhir-converter/src/lib/workers/worker.js was already reviewed and approved in the feature PR, even though it logs both error messages and stack traces.
Learnt from: leite08
PR: metriport/metriport#4255
File: packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts:19-20
Timestamp: 2025-07-27T00:40:32.149Z
Learning: User leite08 acknowledged the duplicate inputBundle spread issue in packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts and will fix it in a follow-up PR rather than in the current patch release PR #4255.
Learnt from: leite08
PR: metriport/metriport#4013
File: packages/core/src/fhir-to-cda/cda-templates/components/procedures.ts:104-105
Timestamp: 2025-06-12T22:53:09.606Z
Learning: User leite08 prefers responses in English only.
Learnt from: leite08
PR: metriport/metriport#3991
File: packages/core/src/external/aws/s3.ts:621-630
Timestamp: 2025-06-10T22:20:21.203Z
Learning: User leite08 prefers that all review comments are written in English.
📚 Learning: 2025-08-27T21:24:19.963Z
Learnt from: leite08
PR: metriport/metriport#4350
File: packages/utils/src/analytics-platform/1-fhir-to-csv.ts:129-133
Timestamp: 2025-08-27T21:24:19.963Z
Learning: In packages/utils/src/analytics-platform/1-fhir-to-csv.ts, the SQS FIFO messageDeduplicationId is intentionally set to just patientId (not scoped to jobId) to prevent any duplicate patient processing across job runs and retries, as confirmed by leite08. This ensures strict patient-level idempotency in the analytics pipeline.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-07-18T09:33:29.581Z
Learnt from: thomasyopes
PR: metriport/metriport#4165
File: packages/api/src/routes/internal/analytics-platform/index.ts:28-28
Timestamp: 2025-07-18T09:33:29.581Z
Learning: In packages/api/src/routes/internal/analytics-platform/index.ts, the patientId parameter handling is intentionally inconsistent: the `/fhir-to-csv` and `/fhir-to-csv/transform` endpoints require patientId (using getFromQueryOrFail) for single patient processing, while the `/fhir-to-csv/batch` endpoint treats patientId as optional (using getFromQuery) because it's designed to run over all patients when no specific patient ID is provided.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-08-26T23:48:06.638Z
Learnt from: RadmirGari
PR: metriport/metriport#4448
File: packages/infra/lib/lambdas-nested-stack.ts:1049-1054
Timestamp: 2025-08-26T23:48:06.638Z
Learning: In packages/infra/lib/lambdas-nested-stack.ts, the API_URL environment variable for HL7 roster upload lambdas is correctly set to config.loadBalancerDnsName without needing an explicit https:// scheme prefix. The configuration works as intended.

Applied to files:

  • packages/infra/lib/api-stack.ts
  • packages/infra/lib/api-stack/api-service.ts
📚 Learning: 2025-07-09T20:41:08.549Z
Learnt from: thomasyopes
PR: metriport/metriport#4167
File: packages/infra/lib/analytics-platform/analytics-platform-stack.ts:56-64
Timestamp: 2025-07-09T20:41:08.549Z
Learning: In the analytics platform Snowflake integration (packages/infra/lib/analytics-platform/analytics-platform-stack.ts), the `integrationUserArn` field in the config temporarily contains an AWS account ID rather than an ARN. This is planned to be updated to an actual ARN in a second pass, at which point the IAM Role's assumedBy should be changed from AccountPrincipal to ArnPrincipal.

Applied to files:

  • packages/infra/lib/api-stack.ts
📚 Learning: 2025-05-20T21:26:26.804Z
Learnt from: leite08
PR: metriport/metriport#3814
File: packages/api/src/routes/internal/medical/patient-consolidated.ts:141-174
Timestamp: 2025-05-20T21:26:26.804Z
Learning: The functionality introduced in packages/api/src/routes/internal/medical/patient-consolidated.ts is planned to be refactored in downstream PR #3857, including improvements to error handling and validation.

Applied to files:

  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
📚 Learning: 2025-06-01T02:28:19.913Z
Learnt from: leite08
PR: metriport/metriport#3944
File: packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts:53-53
Timestamp: 2025-06-01T02:28:19.913Z
Learning: In packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts, the processErrors function intentionally throws MetriportError to bubble errors up the call stack rather than handling them locally. This is by design - errors from ingestPatientConsolidated should propagate upward rather than being caught at immediate calling locations.

Applied to files:

  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-07-27T00:40:32.149Z
Learnt from: leite08
PR: metriport/metriport#4255
File: packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts:19-20
Timestamp: 2025-07-27T00:40:32.149Z
Learning: User leite08 acknowledged the duplicate inputBundle spread issue in packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts and will fix it in a follow-up PR rather than in the current patch release PR #4255.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
📚 Learning: 2025-08-21T20:18:02.196Z
Learnt from: RamilGaripov
PR: metriport/metriport#4436
File: packages/core/src/command/feature-flags/types.ts:45-46
Timestamp: 2025-08-21T20:18:02.196Z
Learning: In the Metriport feature flags system, new schema keys in packages/core/src/command/feature-flags/types.ts are handled for backward compatibility through the initialFeatureFlags pattern in ffs-on-dynamodb.ts rather than adding schema-level defaults with .default(). This maintains consistency across all feature flag handling.

Applied to files:

  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
📚 Learning: 2025-04-01T20:57:29.282Z
Learnt from: leite08
PR: metriport/metriport#3550
File: packages/api/src/external/commonwell/__tests__/patient.test.ts:5-5
Timestamp: 2025-04-01T20:57:29.282Z
Learning: PR #3574 is the follow-up PR that moves the domain feature flags from packages/api/src/aws/app-config.ts to packages/core/src/command/feature-flags/domain-ffs.ts.

Applied to files:

  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
📚 Learning: 2025-08-21T16:24:56.979Z
Learnt from: RamilGaripov
PR: metriport/metriport#4434
File: packages/core/src/command/feature-flags/domain-ffs.ts:311-318
Timestamp: 2025-08-21T16:24:56.979Z
Learning: In the Metriport codebase, feature flag helper functions in packages/core/src/command/feature-flags/domain-ffs.ts follow a consistent pattern with getCxsWithXXXFeatureFlag() and isXXXEnabledForCx() functions, and individual unit tests are not added for each pair since they follow the same reliable pattern used throughout the codebase.

Applied to files:

  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
📚 Learning: 2025-08-22T01:55:48.135Z
Learnt from: Orta21
PR: metriport/metriport#4357
File: packages/core/src/command/feature-flags/ffs-on-dynamodb.ts:46-46
Timestamp: 2025-08-22T01:55:48.135Z
Learning: In the Metriport feature flags system, when adding new feature flag keys, the team prefers to maintain the existing parsing approach in ffs-on-dynamodb.ts rather than switching to safeParse for backward compatibility, relying instead on the established initialFeatureFlags pattern.

Applied to files:

  • packages/core/src/command/feature-flags/types.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
📚 Learning: 2025-04-01T20:57:29.282Z
Learnt from: leite08
PR: metriport/metriport#3550
File: packages/api/src/external/commonwell/__tests__/patient.test.ts:5-5
Timestamp: 2025-04-01T20:57:29.282Z
Learning: PR #3574 is the follow-up PR that moves the domain feature flags from app-config.ts to the domain folder in packages/core.

Applied to files:

  • packages/core/src/command/feature-flags/domain-ffs.ts
📚 Learning: 2025-04-01T19:59:22.396Z
Learnt from: leite08
PR: metriport/metriport#3550
File: packages/api/src/external/commonwell/__tests__/patient.test.ts:5-5
Timestamp: 2025-04-01T19:59:22.396Z
Learning: The file at `packages/api/src/aws/app-config.ts` contains domain feature flags (not AWS AppConfig specific functionality) and is planned to be moved to the domain folder in packages/core in a future PR.

Applied to files:

  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
📚 Learning: 2025-08-20T19:28:41.957Z
Learnt from: RadmirGari
PR: metriport/metriport#4416
File: packages/infra/lib/secrets-stack.ts:0-0
Timestamp: 2025-08-20T19:28:41.957Z
Learning: In packages/infra/lib/secrets-stack.ts, when accessing props.config.hl7Notification.hieConfigs in non-sandbox environments, the team prefers to let it fail if hl7Notification is missing rather than using optional chaining, as missing hl7Notification configuration indicates a serious issue that should halt deployment.

Applied to files:

  • packages/infra/lib/api-stack/api-service.ts
📚 Learning: 2025-08-20T12:40:34.100Z
Learnt from: leite08
PR: metriport/metriport#4350
File: packages/utils/src/analytics-platform/2-merge-csvs.ts:58-61
Timestamp: 2025-08-20T12:40:34.100Z
Learning: In packages/utils/src/analytics-platform/2-merge-csvs.ts, the constant messageGroupId "merge-csvs" for SQS FIFO processing is intentional by user leite08, indicating they want sequential processing rather than concurrent chunk processing for the merge CSV workflow.

Applied to files:

  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
📚 Learning: 2025-05-26T16:24:10.245Z
Learnt from: leite08
PR: metriport/metriport#3859
File: packages/utils/src/consolidated/check-patient-consolidated.ts:28-32
Timestamp: 2025-05-26T16:24:10.245Z
Learning: For utility scripts in the packages/utils directory, developers are expected to update the main constants at the top of the file (like bundleFilePath, cxId, patientId) rather than implementing command-line argument parsing or environment variable reading. This is the preferred pattern for configuring utility scripts in this codebase.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
📚 Learning: 2025-06-18T00:18:59.144Z
Learnt from: leite08
PR: metriport/metriport#4043
File: packages/core/src/command/consolidated/consolidated-create.ts:0-0
Timestamp: 2025-06-18T00:18:59.144Z
Learning: In the consolidated bundle creation process (packages/core/src/command/consolidated/consolidated-create.ts), the team prefers strict failure behavior where if any component (including pharmacy resources) fails to be retrieved, the entire consolidation should fail rather than gracefully degrading. This ensures data consistency and prevents incomplete bundles.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-05-28T02:32:27.527Z
Learnt from: lucasdellabella
PR: metriport/metriport#3907
File: packages/core/src/external/fhir/adt-encounters.ts:118-143
Timestamp: 2025-05-28T02:32:27.527Z
Learning: In packages/core/src/external/fhir/adt-encounters.ts, conversion bundles stored by saveAdtConversionBundle do not require version ID assertions or versioning, unlike the sourced encounter data stored by putAdtSourcedEncounter which does require versionId validation.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-05-31T21:58:28.502Z
Learnt from: leite08
PR: metriport/metriport#3940
File: packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts:82-86
Timestamp: 2025-05-31T21:58:28.502Z
Learning: In packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts, the mutable array operations using push() on resourcesMutable and hydratedMutable (lines 82-86 and 100-103) have been explicitly accepted as exceptions to the immutability guidelines after previous discussion.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-04-16T00:25:25.196Z
Learnt from: RamilGaripov
PR: metriport/metriport#3676
File: packages/core/src/command/hl7v2-subscriptions/hl7v2-to-fhir-conversion/shared.ts:1-10
Timestamp: 2025-04-16T00:25:25.196Z
Learning: The circular dependency between shared.ts (importing getPatientIdsOrFail) and adt/utils.ts (using unpackPidFieldOrFail) will be addressed in a follow-up PR.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-06-19T22:44:49.393Z
Learnt from: thomasyopes
PR: metriport/metriport#4061
File: packages/api/src/external/ehr/shared/job/bundle/create-resource-diff-bundles/run-job.ts:34-47
Timestamp: 2025-06-19T22:44:49.393Z
Learning: In packages/api/src/external/ehr/shared/job/bundle/create-resource-diff-bundles/run-job.ts, the team prefers to keep refreshEhrBundles operations grouped using fire-and-forget pattern rather than awaiting all operations with Promise.allSettled(). This allows individual refresh operations to run independently without blocking the job runner.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
📚 Learning: 2025-08-21T01:50:04.471Z
Learnt from: keshavsaharia
PR: metriport/metriport#4394
File: packages/core/src/external/quest/command/fhir-converter/fhir-converter-direct.ts:5-8
Timestamp: 2025-08-21T01:50:04.471Z
Learning: In packages/core/src/external/quest/command/fhir-converter/fhir-converter-direct.ts, the QuestFhirConverterCommandDirect class is intentionally implemented as a stub/TODO for ENG-864, as confirmed by keshavsaharia. The Quest FHIR conversion feature is being developed in phases - the current PR establishes the pipeline structure, and the next PR will implement the actual conversion logic.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
📚 Learning: 2025-06-25T01:56:08.732Z
Learnt from: thomasyopes
PR: metriport/metriport#4090
File: packages/core/src/command/conversion-fhir/conversion-fhir-direct.ts:18-25
Timestamp: 2025-06-25T01:56:08.732Z
Learning: In packages/core/src/command/conversion-fhir/conversion-fhir-direct.ts, the ConversionFhirDirect class is designed for local testing purposes and does not require comprehensive error handling for HTTP requests, as confirmed by the user thomasyopes.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
📚 Learning: 2025-07-18T09:53:38.906Z
Learnt from: thomasyopes
PR: metriport/metriport#4165
File: packages/data-transformation/fhir-to-csv/main.py:0-0
Timestamp: 2025-07-18T09:53:38.906Z
Learning: In packages/data-transformation/fhir-to-csv/main.py, S3 upload operations should be allowed to hard fail without try-catch blocks. The team prefers immediate failure over error handling and continuation for S3 upload operations in the FHIR-to-CSV transformation process.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/data-transformation/fhir-to-csv/main.py
📚 Learning: 2025-03-11T20:42:46.516Z
Learnt from: thomasyopes
PR: metriport/metriport#3427
File: packages/core/src/external/ehr/api/sync-patient.ts:16-55
Timestamp: 2025-03-11T20:42:46.516Z
Learning: In the patient synchronization architecture, the flow follows this pattern: (1) `ehr-sync-patient-cloud.ts` sends messages to an SQS queue, (2) the `ehr-sync-patient` Lambda consumes these messages, and (3) the Lambda uses the `syncPatient` function to make the API calls to process the patient data.

Applied to files:

  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
📚 Learning: 2025-05-28T19:23:20.179Z
Learnt from: keshavsaharia
PR: metriport/metriport#3885
File: packages/core/src/external/sftp/surescripts/client.ts:1-134
Timestamp: 2025-05-28T19:23:20.179Z
Learning: In packages/core/src/external/sftp/surescripts/client.ts, the standalone getPatientLoadFileName function intentionally omits the transmission ID from the filename, which differs from the class method getPatientLoadFileName. This difference in filename generation is expected behavior.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-08-25T23:28:41.385Z
Learnt from: keshavsaharia
PR: metriport/metriport#4459
File: packages/core/src/external/quest/fhir/patient.ts:22-24
Timestamp: 2025-08-25T23:28:41.385Z
Learning: FHIR resources should have their ID field determined by `uuidv7()` generated UUIDs. The import should be: `import { uuidv7 } from "metriport/shared/util/uuid-v7";`. External system IDs should not be used directly as FHIR resource IDs, even when sanitized, but should instead be preserved in the identifier field for reference mapping.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-05-08T19:41:36.533Z
Learnt from: thomasyopes
PR: metriport/metriport#3788
File: packages/api/src/external/ehr/shared/utils/bundle.ts:83-93
Timestamp: 2025-05-08T19:41:36.533Z
Learning: In the Metriport codebase, the team prefers to let errors bubble up naturally in some cases rather than adding explicit error handling at every layer, as demonstrated in the refreshEhrBundle function in the bundle.ts file.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
🧬 Code graph analysis (13)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts (4)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (1)
  • FhirToCsvBulkHandler (9-11)
packages/core/src/util/config.ts (1)
  • Config (11-404)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1)
  • FhirToCsvBulkDirect (5-24)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (1)
  • FhirToCsvBulkCloud (6-23)
packages/core/src/util/config.ts (1)
packages/shared/src/common/env-var.ts (1)
  • getEnvVarOrFail (14-20)
packages/core/src/command/analytics-platform/incremental-ingestion.ts (2)
packages/core/src/util/log.ts (2)
  • out (30-35)
  • log (10-23)
packages/core/src/command/feature-flags/domain-ffs.ts (1)
  • isAnalyticsIncrementalIngestionEnabledForCx (331-334)
packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts (1)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1)
  • FhirToCsvBulkDirect (5-24)
packages/core/src/command/analytics-platform/merge-csvs/index.ts (1)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (2)
  • buildFhirToCsvBulkPatientPrefix (19-29)
  • parseTableNameFromFhirToCsvBulkFileKey (31-38)
packages/utils/src/analytics-platform/2-merge-csvs.ts (2)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (2)
  • buildFhirToCsvBulkJobPrefix (9-17)
  • parsePatientIdFromFhirToCsvBulkPatientPrefix (40-48)
packages/mllp-server/src/utils.ts (2)
  • s3Utils (18-18)
  • bucketName (19-19)
packages/api/src/routes/internal/analytics-platform/index.ts (4)
packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts (1)
  • handler (17-38)
packages/data-transformation/fhir-to-csv/main.py (1)
  • handler (90-125)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts (1)
  • buildFhirToCsvBulkHandler (6-11)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)
  • buildFhirToCsvBulkJobPrefix (9-17)
packages/core/src/command/consolidated/consolidated-create.ts (2)
packages/core/src/command/analytics-platform/incremental-ingestion.ts (1)
  • ingestPatientIntoAnalyticsPlatform (5-27)
packages/core/src/util/error/shared.ts (1)
  • processAsyncError (57-71)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (2)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (2)
  • FhirToCsvBulkHandler (9-11)
  • ProcessFhirToCsvBulkRequest (1-7)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts (1)
  • startFhirToCsvTransform (14-45)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (2)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (2)
  • FhirToCsvBulkHandler (9-11)
  • ProcessFhirToCsvBulkRequest (1-7)
packages/shared/src/net/retry.ts (1)
  • executeWithNetworkRetries (111-138)
packages/data-transformation/fhir-to-csv/main.py (1)
packages/data-transformation/fhir-to-csv/src/utils/file.py (1)
  • create_consolidated_key (3-4)
packages/utils/src/analytics-platform/1-fhir-to-csv.ts (3)
packages/core/src/external/aws/s3.ts (1)
  • S3Utils (141-622)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (1)
  • FhirToCsvBulkCloud (6-23)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)
  • buildFhirToCsvBulkJobPrefix (9-17)
packages/infra/lib/analytics-platform/analytics-platform-stack.ts (3)
packages/infra/lib/shared/settings.ts (1)
  • QueueAndLambdaSettings (15-49)
packages/infra/config/env-config.ts (1)
  • EnvConfigNonSandbox (304-317)
packages/infra/lib/shared/lambda.ts (1)
  • createLambda (85-158)
🪛 Ruff (0.12.2)
packages/data-transformation/fhir-to-csv/main.py

109-109: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: Analyze (javascript)
🔇 Additional comments (32)
packages/lambdas/src/shared/analytics-platform.ts (1)

8-8: Requiring outputPrefix is a breaking change—manually verify all payload builders include this field. Automated search (rg) returned no hits for fhirToCsvSchema.parse/safeParse or prefix‐helper functions; confirm all writers (API, core, utils, other lambdas) are updated or implement a transitional support path.

packages/core/src/command/feature-flags/ffs-on-dynamodb.ts (1)

70-71: Default for new flag added correctly

analyticsIncrementalIngestion follows the established initialFeatureFlags pattern for back-compat.

packages/core/src/command/feature-flags/types.ts (1)

47-48: Schema extension looks good

analyticsIncrementalIngestion added to cxBasedFFsSchema; aligns with usage/helpers and initial defaults.

packages/core/src/command/feature-flags/domain-ffs.ts (1)

328-330: Helper pair added consistently

getCxsEnabledForAnalyticsIncrementalIngestion mirrors existing patterns and reuses getCxsWithFeatureFlagEnabled.

packages/core/src/command/consolidated/consolidated-create.ts (2)

70-85: Including ADT-sourced resources via Promise.all is OK

This keeps strict failure semantics consistent with prior decisions; if ADT retrieval fails, consolidation fails.


172-174: Fire-and-forget ingestion with centralized error handling

Async invocation with processAsyncError matches existing operational style and won’t block consolidation.

packages/core/src/command/analytics-platform/incremental-ingestion.ts (1)

5-27: Good guarded stub and wiring

Logs are scoped; feature-flag gating prevents unnecessary work; integration point is ready for ENG-743 implementation.

packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts (2)

28-30: Double-check API_URL scheme handling

If API_URL already contains http/https, prepending http:// will produce an invalid URL.

Please verify ENV provides a host without scheme for this path, or switch to using API_LB_ADDRESS explicitly here.


10-12: Ensure patient-scoped outputPrefix is used
Handlers like FhirToCsvBulkDirect and FhirToCsvBulkCloud simply forward the outputPrefix from ProcessFhirToCsvBulkRequest. Confirm that the code constructing these requests (e.g. the CLI or bulk-job initializer) invokes

buildFhirToCsvBulkPatientPrefix({ cxId, jobId, patientId })

for its outputPrefix to avoid cross-patient file collisions.

packages/infra/lib/analytics-platform/types.ts (1)

5-8: Verified imports and consumers updated
No occurrences of the old fhirToCsvLambda remain; all references now use fhirToCsvBulkLambda and mergeCsvsLambda as intended.

packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)

19-29: LGTM on bulk prefixes

Prefix structure is clear and consistent: cx → f2c → pt.

packages/infra/lib/api-stack.ts (1)

799-801: Switch to fhirToCsvBulkLambda in API_URL injection looks correct

Good alignment with the bulk architecture. Verify that AnalyticsPlatformsNestedStack.getAssets() exposes fhirToCsvBulkLambda and that this lambda actually needs API_URL (otherwise harmless to keep).

packages/core/src/command/analytics-platform/merge-csvs/index.ts (2)

8-10: Imports updated to Bulk helpers — consistent with new key structure

Matches the new Bulk file-name schema.


203-208: Use Bulk patient prefix for S3 listing — LGTM

Prefix construction aligns with buildFhirToCsvBulkPatientPrefix() semantics.

packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts (2)

1-1: Import switched to BulkDirect — correct for SQS consumer

Matches the new bulk flow.


29-29: Instantiate BulkDirect here (not the factory) — good call

Using the direct handler avoids re-enqueuing and prevents loops.

packages/utils/src/analytics-platform/2-merge-csvs.ts (1)

5-7: Updated to Bulk prefix/parse helpers — matches new layout

Keeps this script in sync with the Bulk S3 key structure.

packages/infra/lib/api-stack/api-service.ts (2)

583-589: SQS permissions updated to bulk queue — LGTM

Access is correctly scoped to send and guarded by the analyticsPlatformAssets check.


430-431: Drop legacy‐compat env var; it’s unused
No occurrences of FHIR_TO_CSV_QUEUE_URL exist in the repo—only FHIR_TO_CSV_BULK_QUEUE_URL is referenced—so no backward‐compatibility stub is needed.

Likely an incorrect or invalid review comment.

packages/api/src/routes/internal/analytics-platform/index.ts (2)

1-2: Imports switched to bulk handler/prefix — LGTM

Consistent with the new bulk flow.


7-7: Parameter helpers — LGTM

Using asyncHandler and getFromQueryOrFail matches existing route patterns.

packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1)

5-7: LGTM: direct bulk handler wiring looks correct.

Implements the bulk interface and keeps the optional wait as before. No issues.

packages/utils/src/analytics-platform/1-fhir-to-csv.ts (2)

5-6: LGTM: switching to the core bulk cloud handler and job-prefix builder.

This aligns the script with the new bulk flow and centralizes SQS usage in core.


123-128: Approve environment wiring FHIR_TO_CSV_BULK_QUEUE_URL is correctly referenced in utils (FhirToCsvBulkCloud), core config (Config.getFhirToCsvBulkQueueUrl), and infra (api-service and analytics-platform-stack).

packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (2)

6-11: LGTM: injectable queue URL and client with sensible defaults.

Constructor allows testability while defaulting to Config-driven values. Good.


12-22: FIFO semantics: patient-scoped dedup and ordering are correct.

Using patientId for both messageDeduplicationId and messageGroupId matches the documented idempotency goal.

Please confirm that:

  • patientId values are unique across tenants sharing the same queue, or queues are tenant-scoped.
  • the queue is configured as FIFO (including high-throughput if needed) to avoid throughput surprises.

If helpful, I can add a short runtime assert in the consumer to validate params on receipt.

packages/infra/lib/analytics-platform/analytics-platform-stack.ts (6)

84-87: LGTM: settings() now returns the bulk config

Return shape matches the new two-lambda architecture.


159-169: LGTM: transform lambda setup and permissions

Docker image lambda, env, alarms, and bucket grants look correct.


221-222: LGTM: explicit DockerImageFunction type for transform

Matches usage and grants below.


256-257: Good: read-only access to medicalDocumentsBucket for transform

Principle of least privilege is observed.


24-27: All legacy fhirToCsv references removed
No occurrences of fhirToCsv (without “Bulk”), FHIR_TO_CSV_QUEUE_URL, fhirToCsvLambda, or fhirToCsvQueue remain in the codebase.


203-206: Confirmed Bulk assets in use—no remaining old references
Ran a repository-wide search; only fhirToCsvBulkLambda and fhirToCsvBulkQueue appear (no fhirToCsvLambda or fhirToCsvQueue matches).

Comment on lines +261 to +275
private setupFhirToCsvBulkLambda(ownProps: {
config: EnvConfigNonSandbox;
envType: EnvType;
awsRegion: string;
lambdaLayers: LambdaLayers;
vpc: ec2.IVpc;
sentryDsn: string | undefined;
alarmAction: SnsAction | undefined;
analyticsPlatformBucket: s3.Bucket;
medicalDocumentsBucket: s3.Bucket;
fhirToCsvTransformLambda: lambda.Function;
}): {
lambda: lambda.DockerImageFunction;
queue: Queue;
} {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix types: accept IFunction for transform param and return Function for bulk

  • The transform param should be lambda.IFunction (works for both Function and DockerImageFunction).
  • The bulk lambda returned by createLambda is lambda.Function, not DockerImageFunction.

Apply:

   private setupFhirToCsvBulkLambda(ownProps: {
     config: EnvConfigNonSandbox;
     envType: EnvType;
     awsRegion: string;
     lambdaLayers: LambdaLayers;
     vpc: ec2.IVpc;
     sentryDsn: string | undefined;
     alarmAction: SnsAction | undefined;
-    analyticsPlatformBucket: s3.Bucket;
-    medicalDocumentsBucket: s3.Bucket;
-    fhirToCsvTransformLambda: lambda.Function;
+    fhirToCsvTransformLambda: lambda.IFunction;
   }): {
-    lambda: lambda.DockerImageFunction;
+    lambda: lambda.Function;
     queue: Queue;
   } {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
packages/infra/lib/analytics-platform/analytics-platform-stack.ts around lines
261-275: the setupFhirToCsvBulkLambda signature and return types are too
specific; change the fhirToCsvTransformLambda parameter type from
lambda.Function to lambda.IFunction to accept both Function and
DockerImageFunction, and change the returned lambda type from
lambda.DockerImageFunction to lambda.Function (i.e., the method should return {
lambda: lambda.Function; queue: Queue; }). Update the function signature and any
related type annotations/returns in this method to match these types.

…l to send to SQS

Ref eng-743

Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com>
Ref eng-743

Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com>
Ref eng-743

Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com>
Ref eng-743

Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com>
@leite08 leite08 force-pushed the eng-743-fhir-inferno-lambda-more-flexible branch from 1c1b6d9 to 539bb22 Compare September 3, 2025 16:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)

31-39: Make table-name parsing resilient; avoid magic index (slice(6)).

Current parsing assumes a fixed underscore layout; any format change breaks it. Prefer a regex that anchors on the known filename pattern and captures the table segment (supports underscores within table names).

Apply this diff:

 export function parseTableNameFromFhirToCsvBulkFileKey(key: string): string {
   // e.g.: snowflake/fhir-to-csv/cx=cx-id/f2c=2025-08-08T02-18-56/pt=patient-id/_tmp_fhir-to-csv_output_cx-id_patient-id_condition.csv
-  const tableName = key.split("/").pop()?.split("_").slice(6).join("_").split(".")[0];
-  if (!tableName) {
+  const baseName = key.split("/").pop();
+  const match = baseName?.match(
+    /^_tmp_fhir-to-csv_output_[^_]+_[^_]+_(.+)\.csv$/u
+  );
+  const tableName = match?.[1];
+  if (!tableName) {
     throw new MetriportError(`Failed to parse tableName from fhirToCsvFileKey`, undefined, { key });
   }
   return tableName;
 }
♻️ Duplicate comments (3)
packages/data-transformation/fhir-to-csv/main.py (1)

70-71: Restore S3 key shape to include cx_id/patient_id/table to avoid collisions and keep downstream parser compatible

Current basename-only key breaks consumers expecting the old pattern and risks collisions across patients. Reintroduce the cx_id/patient_id/table naming.

Apply this diff:

-            file_name = file.split("/")[-1].replace("/", "_")
-            output_file_key = f"{output_file_prefix}/{file_name}"
+            table_name = os.path.splitext(os.path.basename(file))[0]
+            file_name = f"_tmp_fhir-to-csv_output_{cx_id}_{patient_id}_{table_name}.csv"
+            output_file_key = f"{output_file_prefix}/{file_name}"
packages/infra/lib/analytics-platform/analytics-platform-stack.ts (2)

99-100: Type mismatch: fhirToCsvBulkLambda is a Function, not DockerImageFunction

createLambda returns lambda.Function. The current field type causes TS errors.

-  readonly fhirToCsvBulkLambda: lambda.DockerImageFunction;
+  readonly fhirToCsvBulkLambda: lambda.Function;

261-275: Fix types: accept IFunction for transform; return Function for bulk

The bulk setup should accept lambda.IFunction (works for both Function and DockerImageFunction) and return lambda.Function from createLambda.

-  private setupFhirToCsvBulkLambda(ownProps: {
+  private setupFhirToCsvBulkLambda(ownProps: {
@@
-    fhirToCsvTransformLambda: lambda.Function;
+    fhirToCsvTransformLambda: lambda.IFunction;
   }): {
-    lambda: lambda.DockerImageFunction;
+    lambda: lambda.Function;
     queue: Queue;
   } {
🧹 Nitpick comments (13)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (4)

3-3: Track and scope the TODO (ENG-954) or split now.

This file currently implements only “Bulk” helpers. Please either:

  • link this TODO to an existing ticket and scope what “Incremental” will require, or
  • split the helpers now to avoid future breaking renames.

I can open a follow-up ticket and enumerate call sites that will need the incremental variants. Do you want that?


5-7: Extract root prefix constant and validate cxId.

  • Avoid inline literals; declare a top-level constant for the S3 root prefix (per guidelines).
  • Guard against illegal path chars in cxId (e.g., “/” or “=”), which would break key structure.

Apply this diff:

 export function buildFhirToCsvBulkBasePrefix(cxId: string): string {
-  return `snowflake/fhir-to-csv/cx=${cxId}`;
+  if (cxId.includes("/") || cxId.includes("=")) {
+    throw new MetriportError("Invalid cxId for S3 key", undefined, { cxId });
+  }
+  return `${FHIR_TO_CSV_ROOT_PREFIX}/cx=${cxId}`;
 }

Add this constant after imports (outside the selected lines):

const FHIR_TO_CSV_ROOT_PREFIX = "snowflake/fhir-to-csv" as const;

19-29: Validate patientId to prevent malformed S3 keys.

Patient IDs containing “/” or “=” would corrupt the prefix and make parsing brittle. Add a small guard.

Apply this diff:

 export function buildFhirToCsvBulkPatientPrefix({
   cxId,
   jobId,
   patientId,
 }: {
   cxId: string;
   jobId: string;
   patientId: string;
 }): string {
-  return `${buildFhirToCsvBulkJobPrefix({ cxId, jobId })}/pt=${patientId}`;
+  if (patientId.includes("/") || patientId.includes("=")) {
+    throw new MetriportError("Invalid patientId for S3 key", undefined, { patientId });
+  }
+  return `${buildFhirToCsvBulkJobPrefix({ cxId, jobId })}/pt=${patientId}`;
 }

40-49: Optional: tighten patientId prefix validation during parse.

Small guard improves error messages for malformed prefixes and reduces false positives.

Apply this diff:

 export function parsePatientIdFromFhirToCsvBulkPatientPrefix(key: string): string {
   // e.g.: snowflake/fhir-to-csv/cx=cx-id/f2c=2025-08-08T02-18-56/pt=patient-id/
   const withoutSlash = key.endsWith("/") ? key.slice(0, -1) : key;
-  const patientId = withoutSlash.split("/").pop()?.replace("pt=", "");
+  const lastSeg = withoutSlash.split("/").pop() ?? "";
+  if (!lastSeg.startsWith("pt=")) {
+    throw new MetriportError(`Failed to parse patientId from fhirToCsvFileKey`, undefined, { key });
+  }
+  const patientId = lastSeg.slice(3);
   if (!patientId) {
     throw new MetriportError(`Failed to parse patientId from fhirToCsvFileKey`, undefined, { key });
   }
   return patientId;
 }
packages/core/src/command/consolidated/consolidated-create.ts (2)

99-103: Log message omits ADT-sourced count

Since ADT resources are now included, reflect them in the summary log.

Apply:

-  log(
-    `Added ${docRefsWithUpdatedMeta.length} docRefs, ` +
-      `${pharmacyResources.length} pharmacy resources, ` +
-      `${labResources.length} lab resources, ` +
-      `and the Patient, to a total of ${bundle.entry.length} entries`
-  );
+  log(
+    `Added ${docRefsWithUpdatedMeta.length} docRefs, ` +
+      `${pharmacyResources.length} pharmacy resources, ` +
+      `${labResources.length} lab resources, ` +
+      `${adtSourcedResources.length} ADT-sourced resources, ` +
+      `and the Patient, to a total of ${bundle.entry.length} entries`
+  );

172-174: No-floating-promises: mark as intentional fire-and-forget

Prefix with void to make intent explicit and satisfy common ESLint rules; keep .catch(processAsyncError(...)).

Apply:

-  ingestPatientIntoAnalyticsPlatform({ cxId, patientId }).catch(
+  void ingestPatientIntoAnalyticsPlatform({ cxId, patientId }).catch(
     processAsyncError("createConsolidatedFromConversions.ingestPatientIntoAnalyticsPlatform")
   );
packages/data-transformation/fhir-to-csv/main.py (3)

96-96: Make comment consistent with behavior; allow event override and trim trailing slash

Keep the comment accurate and accept OUTPUT_PREFIX from the event for parity with other inputs.

Apply this diff:

-    output_file_prefix = os.getenv("OUTPUT_PREFIX") # Will append '/<cx_id>_<patient_id>_<table_name>.csv' to it
+    # Prefix under which CSVs will be written; function appends '<cx_id>_<patient_id>_<table_name>.csv'
+    output_file_prefix = event.get("OUTPUT_PREFIX") or os.getenv("OUTPUT_PREFIX")
+    if output_file_prefix:
+        output_file_prefix = output_file_prefix.rstrip("/")

84-86: Ensure cleanup runs on failures (move rmtree to a finally block)

If any upload fails before Line 84, /tmp may accumulate across warm invocations.

Apply this diff (schematic):

-    print(f"Cleaning up local files in {local_cx_path}")
-    shutil.rmtree(local_cx_path)
+    # Ensure cleanup even on failures
+    print(f"Cleaning up local files in {local_cx_path}")
+    shutil.rmtree(local_cx_path, ignore_errors=True)

And wrap Lines 41–83 in a try/finally, moving the cleanup into finally:

-    with open(local_bundle_key, "rb") as f:
+    try:
+        with open(local_bundle_key, "rb") as f:
           ...
-    print(f"Cleaning up local files in {local_cx_path}")
-    shutil.rmtree(local_cx_path)
+    finally:
+        print(f"Cleaning up local files in {local_cx_path}")
+        shutil.rmtree(local_cx_path, ignore_errors=True)

121-124: Avoid exit(0) in Lambda handlers; return instead

Exiting the process can affect re-use of warm containers and observability.

Apply this diff:

-    if len(output_bucket_and_file_keys_and_table_names) < 1:
-        print("No files were uploaded")
-        exit(0)
+    if len(output_bucket_and_file_keys_and_table_names) < 1:
+        print("No files were uploaded")
+        return {"status": "no_files"}
packages/infra/lib/analytics-platform/analytics-platform-stack.ts (3)

31-52: Bulk queue/lambda settings look sane; tiny naming nit

Timeouts and visibility timeout math are sensible for SQS retries. Nit: “No limit” comment (Line 19) is misleading since wait time is 0 ms (no wait), not unlimited.

Apply:

-const waitTimeFhirToCsv = Duration.seconds(0); // No limit
+const waitTimeFhirToCsv = Duration.seconds(0); // No wait

299-320: Rename local “lambda” var to avoid shadowing the aws-lambda namespace

Using “lambda” as a variable shadows the imported namespace and hurts readability.

-    const lambda = createLambda({
+    const bulkLambda = createLambda({
@@
-    lambda.addEventSource(new SqsEventSource(queue, eventSourceSettings));
-    fhirToCsvTransformLambda.grantInvoke(lambda);
+    bulkLambda.addEventSource(new SqsEventSource(queue, eventSourceSettings));
+    fhirToCsvTransformLambda.grantInvoke(bulkLambda);
 
-    return { lambda, queue };
+    return { lambda: bulkLambda, queue };

115-121: Harden S3 bucket configuration (optional)

Consider explicitly blocking public access, enforcing SSL, and bucket-owner-enforced object ownership.

 const analyticsPlatformBucket = new s3.Bucket(this, "AnalyticsPlatformBucket", {
   bucketName: props.config.analyticsPlatform.bucketName,
-  publicReadAccess: false,
+  publicReadAccess: false,
+  blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
+  enforceSSL: true,
   encryption: s3.BucketEncryption.S3_MANAGED,
   versioned: true,
+  objectOwnership: s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
 });
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (1)

15-21: Follow guideline to avoid arrow functions; use a named fn inside the method.

Keeps style consistent and slightly improves debuggability.

Apply:

-    await executeWithNetworkRetries(async () => {
-      await this.sqsClient.sendMessageToQueue(this.fhirToCsvQueueUrl, payload, {
-        fifo: true,
-        messageDeduplicationId: patientId,
-        messageGroupId: patientId,
-      });
-    });
+    const self = this;
+    async function sendOnce(): Promise<void> {
+      await self.sqsClient.sendMessageToQueue(self.fhirToCsvQueueUrl, payload, {
+        fifo: true,
+        messageDeduplicationId: patientId,
+        messageGroupId: patientId,
+      });
+    }
+    await executeWithNetworkRetries(sendOnce);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled
  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1c1b6d9 and 539bb22.

📒 Files selected for processing (26)
  • packages/api/src/routes/internal/analytics-platform/index.ts (2 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (1 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts (2 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-factory.ts (0 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv.ts (0 hunks)
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (3 hunks)
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts (1 hunks)
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts (3 hunks)
  • packages/core/src/command/consolidated/consolidated-create.ts (3 hunks)
  • packages/core/src/command/feature-flags/domain-ffs.ts (1 hunks)
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts (1 hunks)
  • packages/core/src/command/feature-flags/types.ts (1 hunks)
  • packages/core/src/util/config.ts (1 hunks)
  • packages/data-transformation/fhir-to-csv/main.py (5 hunks)
  • packages/data-transformation/fhir-to-csv/src/utils/file.py (0 hunks)
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts (10 hunks)
  • packages/infra/lib/analytics-platform/types.ts (1 hunks)
  • packages/infra/lib/api-stack.ts (1 hunks)
  • packages/infra/lib/api-stack/api-service.ts (2 hunks)
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts (2 hunks)
  • packages/lambdas/src/shared/analytics-platform.ts (1 hunks)
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts (3 hunks)
  • packages/utils/src/analytics-platform/2-merge-csvs.ts (2 hunks)
💤 Files with no reviewable changes (3)
  • packages/data-transformation/fhir-to-csv/src/utils/file.py
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-factory.ts
🚧 Files skipped from review as they are similar to previous changes (17)
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-factory.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts
  • packages/core/src/util/config.ts
  • packages/core/src/command/feature-flags/domain-ffs.ts
  • packages/api/src/routes/internal/analytics-platform/index.ts
  • packages/lambdas/src/shared/analytics-platform.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-transform.ts
  • packages/core/src/command/analytics-platform/incremental-ingestion.ts
  • packages/infra/lib/api-stack.ts
  • packages/core/src/command/feature-flags/types.ts
  • packages/lambdas/src/analytics-platform/fhir-to-csv-bulk.ts
  • packages/core/src/command/feature-flags/ffs-on-dynamodb.ts
  • packages/infra/lib/analytics-platform/types.ts
  • packages/core/src/command/analytics-platform/merge-csvs/index.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-direct.ts
  • packages/utils/src/analytics-platform/1-fhir-to-csv.ts
  • packages/infra/lib/api-stack/api-service.ts
🧰 Additional context used
📓 Path-based instructions (2)
**/*.{js,jsx,ts,tsx}

📄 CodeRabbit inference engine (.cursorrules)

**/*.{js,jsx,ts,tsx}: Don’t use null inside the app, only on code interacting with external interfaces/services, like DB and HTTP; convert to undefined before sending inwards into the code
Use const whenever possible
Use async/await instead of .then()
Naming: classes, enums: PascalCase
Naming: constants, variables, functions: camelCase
Naming: file names: kebab-case
Naming: Don’t use negative names, like notEnabled, prefer isDisabled
If possible, use decomposing objects for function parameters
Prefer Nullish Coalesce (??) than the OR operator (||) when you want to provide a default value
Avoid creating arrow functions
Use truthy syntax instead of in - i.e., if (data.link) not if ('link' in data)
While handling errors, keep the stack trace around: if you create a new Error (e.g., MetriportError), make sure to pass the original error as the new one’s cause so the stack trace is available upstream.
max column length is 100 chars
multi-line comments use /** */
top-level comments go after the import (save pre-import to basic file header, like license)
move literals to constants declared after imports when possible

Files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursorrules)

Use types whenever possible

**/*.{ts,tsx}: Prefer functional programming style functions: small, deterministic, single input and output
Avoid modifying objects received as parameters
Naming: classes and enums use PascalCase
Naming: constants, variables, and functions use camelCase
File names use kebab-case
Avoid negative names (e.g., prefer isDisabled to notEnabled)
Include units in numeric variable names when the type doesn’t convey the unit
TypeScript: use explicit types
TypeScript: prefer const over let
TypeScript: avoid any and casting from any to other types
TypeScript: use type predicates only for narrowing, not for full type coercion
TypeScript: prefer destructuring function parameters instead of multiple same-typed parameters
TypeScript: don’t use null inside the app; only at external boundaries and convert to undefined internally
TypeScript: use async/await instead of .then()
TypeScript: use strict equality ===; do not use ==
When calling a Promise-returning function without awaiting, chain .catch() to handle errors
Date/Time: always use buildDayjs() to create dayjs instances
Date/Time: prefer dayjs.duration(...) and keep values as duration
Prefer nullish coalescing (??) over OR (||) for default values
Avoid creating arrow functions
Use truthy property checks instead of the in operator (e.g., if (data.link))
Error handling: pass the original error as cause when wrapping errors
Error handling: use static error messages; put dynamic data in MetriportError.additionalInfo; error object last parameter
Global constants: move literals to constants after imports (avoid magic numbers)
Avoid multi-line logs: do not pass objects as the second parameter to console.log() or out().log()
Avoid multi-line logs: do not create multi-line strings with JSON.stringify()
Max column length is 100 characters
Use multi-line comments with /** */

Files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
🧠 Learnings (21)
📓 Common learnings
Learnt from: leite08
PR: metriport/metriport#4469
File: packages/fhir-converter/src/lib/workers/worker.js:75-87
Timestamp: 2025-08-26T02:33:26.777Z
Learning: User leite08 confirmed that the debug logging approach in packages/fhir-converter/src/lib/workers/worker.js was already reviewed and approved in the feature PR, even though it logs both error messages and stack traces.
Learnt from: leite08
PR: metriport/metriport#4013
File: packages/core/src/fhir-to-cda/cda-templates/components/procedures.ts:104-105
Timestamp: 2025-06-12T22:53:09.606Z
Learning: User leite08 prefers responses in English only.
Learnt from: leite08
PR: metriport/metriport#3991
File: packages/core/src/external/aws/s3.ts:621-630
Timestamp: 2025-06-10T22:20:21.203Z
Learning: User leite08 prefers that all review comments are written in English.
📚 Learning: 2025-08-27T21:24:19.963Z
Learnt from: leite08
PR: metriport/metriport#4350
File: packages/utils/src/analytics-platform/1-fhir-to-csv.ts:129-133
Timestamp: 2025-08-27T21:24:19.963Z
Learning: In packages/utils/src/analytics-platform/1-fhir-to-csv.ts, the SQS FIFO messageDeduplicationId is intentionally set to just patientId (not scoped to jobId) to prevent any duplicate patient processing across job runs and retries, as confirmed by leite08. This ensures strict patient-level idempotency in the analytics pipeline.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-08-20T12:40:34.100Z
Learnt from: leite08
PR: metriport/metriport#4350
File: packages/utils/src/analytics-platform/2-merge-csvs.ts:58-61
Timestamp: 2025-08-20T12:40:34.100Z
Learning: In packages/utils/src/analytics-platform/2-merge-csvs.ts, the constant messageGroupId "merge-csvs" for SQS FIFO processing is intentional by user leite08, indicating they want sequential processing rather than concurrent chunk processing for the merge CSV workflow.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
📚 Learning: 2025-09-01T19:57:53.644Z
Learnt from: RamilGaripov
PR: metriport/metriport#4470
File: packages/api/src/external/commonwell-v1/document/cw-to-fhir.ts:468-479
Timestamp: 2025-09-01T19:57:53.644Z
Learning: The file packages/api/src/external/commonwell-v1/document/cw-to-fhir.ts contains existing logic copied over from packages/api/src/external/fhir/document/index.ts, so issues in this file are out of scope for code review as they represent moved code rather than new functionality.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
📚 Learning: 2025-05-20T21:26:26.804Z
Learnt from: leite08
PR: metriport/metriport#3814
File: packages/api/src/routes/internal/medical/patient-consolidated.ts:141-174
Timestamp: 2025-05-20T21:26:26.804Z
Learning: The functionality introduced in packages/api/src/routes/internal/medical/patient-consolidated.ts is planned to be refactored in downstream PR #3857, including improvements to error handling and validation.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-07-27T00:40:32.149Z
Learnt from: leite08
PR: metriport/metriport#4255
File: packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts:19-20
Timestamp: 2025-07-27T00:40:32.149Z
Learning: User leite08 acknowledged the duplicate inputBundle spread issue in packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv/fhir-to-csv-direct.ts and will fix it in a follow-up PR rather than in the current patch release PR #4255.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-07-18T09:33:29.581Z
Learnt from: thomasyopes
PR: metriport/metriport#4165
File: packages/api/src/routes/internal/analytics-platform/index.ts:28-28
Timestamp: 2025-07-18T09:33:29.581Z
Learning: In packages/api/src/routes/internal/analytics-platform/index.ts, the patientId parameter handling is intentionally inconsistent: the `/fhir-to-csv` and `/fhir-to-csv/transform` endpoints require patientId (using getFromQueryOrFail) for single patient processing, while the `/fhir-to-csv/batch` endpoint treats patientId as optional (using getFromQuery) because it's designed to run over all patients when no specific patient ID is provided.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-05-26T16:24:10.245Z
Learnt from: leite08
PR: metriport/metriport#3859
File: packages/utils/src/consolidated/check-patient-consolidated.ts:28-32
Timestamp: 2025-05-26T16:24:10.245Z
Learning: For utility scripts in the packages/utils directory, developers are expected to update the main constants at the top of the file (like bundleFilePath, cxId, patientId) rather than implementing command-line argument parsing or environment variable reading. This is the preferred pattern for configuring utility scripts in this codebase.

Applied to files:

  • packages/utils/src/analytics-platform/2-merge-csvs.ts
📚 Learning: 2025-06-18T00:18:59.144Z
Learnt from: leite08
PR: metriport/metriport#4043
File: packages/core/src/command/consolidated/consolidated-create.ts:0-0
Timestamp: 2025-06-18T00:18:59.144Z
Learning: In the consolidated bundle creation process (packages/core/src/command/consolidated/consolidated-create.ts), the team prefers strict failure behavior where if any component (including pharmacy resources) fails to be retrieved, the entire consolidation should fail rather than gracefully degrading. This ensures data consistency and prevents incomplete bundles.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-05-28T02:32:27.527Z
Learnt from: lucasdellabella
PR: metriport/metriport#3907
File: packages/core/src/external/fhir/adt-encounters.ts:118-143
Timestamp: 2025-05-28T02:32:27.527Z
Learning: In packages/core/src/external/fhir/adt-encounters.ts, conversion bundles stored by saveAdtConversionBundle do not require version ID assertions or versioning, unlike the sourced encounter data stored by putAdtSourcedEncounter which does require versionId validation.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-05-31T21:58:28.502Z
Learnt from: leite08
PR: metriport/metriport#3940
File: packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts:82-86
Timestamp: 2025-05-31T21:58:28.502Z
Learning: In packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts, the mutable array operations using push() on resourcesMutable and hydratedMutable (lines 82-86 and 100-103) have been explicitly accepted as exceptions to the immutability guidelines after previous discussion.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-04-16T00:25:25.196Z
Learnt from: RamilGaripov
PR: metriport/metriport#3676
File: packages/core/src/command/hl7v2-subscriptions/hl7v2-to-fhir-conversion/shared.ts:1-10
Timestamp: 2025-04-16T00:25:25.196Z
Learning: The circular dependency between shared.ts (importing getPatientIdsOrFail) and adt/utils.ts (using unpackPidFieldOrFail) will be addressed in a follow-up PR.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-06-01T02:28:19.913Z
Learnt from: leite08
PR: metriport/metriport#3944
File: packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts:53-53
Timestamp: 2025-06-01T02:28:19.913Z
Learning: In packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts, the processErrors function intentionally throws MetriportError to bubble errors up the call stack rather than handling them locally. This is by design - errors from ingestPatientConsolidated should propagate upward rather than being caught at immediate calling locations.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-06-19T22:44:49.393Z
Learnt from: thomasyopes
PR: metriport/metriport#4061
File: packages/api/src/external/ehr/shared/job/bundle/create-resource-diff-bundles/run-job.ts:34-47
Timestamp: 2025-06-19T22:44:49.393Z
Learning: In packages/api/src/external/ehr/shared/job/bundle/create-resource-diff-bundles/run-job.ts, the team prefers to keep refreshEhrBundles operations grouped using fire-and-forget pattern rather than awaiting all operations with Promise.allSettled(). This allows individual refresh operations to run independently without blocking the job runner.

Applied to files:

  • packages/core/src/command/consolidated/consolidated-create.ts
📚 Learning: 2025-07-18T09:53:38.906Z
Learnt from: thomasyopes
PR: metriport/metriport#4165
File: packages/data-transformation/fhir-to-csv/main.py:0-0
Timestamp: 2025-07-18T09:53:38.906Z
Learning: In packages/data-transformation/fhir-to-csv/main.py, S3 upload operations should be allowed to hard fail without try-catch blocks. The team prefers immediate failure over error handling and continuation for S3 upload operations in the FHIR-to-CSV transformation process.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
  • packages/data-transformation/fhir-to-csv/main.py
📚 Learning: 2025-05-28T19:23:20.179Z
Learnt from: keshavsaharia
PR: metriport/metriport#3885
File: packages/core/src/external/sftp/surescripts/client.ts:1-134
Timestamp: 2025-05-28T19:23:20.179Z
Learning: In packages/core/src/external/sftp/surescripts/client.ts, the standalone getPatientLoadFileName function intentionally omits the transmission ID from the filename, which differs from the class method getPatientLoadFileName. This difference in filename generation is expected behavior.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-08-25T23:28:41.385Z
Learnt from: keshavsaharia
PR: metriport/metriport#4459
File: packages/core/src/external/quest/fhir/patient.ts:22-24
Timestamp: 2025-08-25T23:28:41.385Z
Learning: FHIR resources should have their ID field determined by `uuidv7()` generated UUIDs. The import should be: `import { uuidv7 } from "metriport/shared/util/uuid-v7";`. External system IDs should not be used directly as FHIR resource IDs, even when sanitized, but should instead be preserved in the identifier field for reference mapping.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-05-08T19:41:36.533Z
Learnt from: thomasyopes
PR: metriport/metriport#3788
File: packages/api/src/external/ehr/shared/utils/bundle.ts:83-93
Timestamp: 2025-05-08T19:41:36.533Z
Learning: In the Metriport codebase, the team prefers to let errors bubble up naturally in some cases rather than adding explicit error handling at every layer, as demonstrated in the refreshEhrBundle function in the bundle.ts file.

Applied to files:

  • packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts
📚 Learning: 2025-06-23T16:48:12.930Z
Learnt from: leite08
PR: metriport/metriport#4073
File: packages/infra/lib/hl7-notification-stack/mllp.ts:62-71
Timestamp: 2025-06-23T16:48:12.930Z
Learning: In TypeScript files, always flag arrow functions as they violate the coding guidelines which state "Avoid creating arrow functions." This applies to both function declarations (const fn = () => {}) and callbacks within methods like reduce, map, filter, etc. Regular function declarations should be used instead.

Applied to files:

  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
📚 Learning: 2025-06-23T16:50:06.930Z
Learnt from: leite08
PR: metriport/metriport#4073
File: packages/shared/src/common/env-var.ts:20-27
Timestamp: 2025-06-23T16:50:06.930Z
Learning: In TypeScript files, avoid creating arrow functions and use regular function declarations instead. For example, instead of `export const myFunction = (param: string): string => { ... }`, use `export function myFunction(param: string): string { ... }`. This applies to all exported functions in the codebase.

Applied to files:

  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
📚 Learning: 2025-07-19T14:03:24.423Z
Learnt from: thomasyopes
PR: metriport/metriport#4165
File: packages/data-transformation/fhir-to-csv/Dockerfile.lambda:1-1
Timestamp: 2025-07-19T14:03:24.423Z
Learning: In packages/data-transformation/fhir-to-csv/Dockerfile.lambda, thomasyopes prefers to use AWS Lambda base image tags (like python:3.12) as documented by AWS rather than pinning to specific digests. The team follows AWS official documentation for Lambda base images.

Applied to files:

  • packages/infra/lib/analytics-platform/analytics-platform-stack.ts
🧬 Code graph analysis (5)
packages/utils/src/analytics-platform/2-merge-csvs.ts (1)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (2)
  • buildFhirToCsvBulkJobPrefix (9-17)
  • parsePatientIdFromFhirToCsvBulkPatientPrefix (40-48)
packages/core/src/command/consolidated/consolidated-create.ts (2)
packages/core/src/command/analytics-platform/incremental-ingestion.ts (1)
  • ingestPatientIntoAnalyticsPlatform (5-27)
packages/core/src/util/error/shared.ts (1)
  • processAsyncError (57-71)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (2)
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk.ts (2)
  • FhirToCsvBulkHandler (9-11)
  • ProcessFhirToCsvBulkRequest (1-7)
packages/shared/src/net/retry.ts (1)
  • executeWithNetworkRetries (111-138)
packages/data-transformation/fhir-to-csv/main.py (1)
packages/data-transformation/fhir-to-csv/src/utils/file.py (1)
  • create_consolidated_key (3-4)
packages/infra/lib/analytics-platform/analytics-platform-stack.ts (3)
packages/infra/lib/shared/settings.ts (1)
  • QueueAndLambdaSettings (15-49)
packages/infra/config/env-config.ts (1)
  • EnvConfigNonSandbox (304-317)
packages/infra/lib/shared/lambda.ts (1)
  • createLambda (85-158)
🪛 Ruff (0.12.2)
packages/data-transformation/fhir-to-csv/main.py

109-109: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: check-pr / lint-build-test
  • GitHub Check: Analyze (javascript)
🔇 Additional comments (19)
packages/core/src/command/analytics-platform/fhir-to-csv/file-name.ts (1)

9-17: LGTM — job prefix composition reads well and matches new base.

packages/utils/src/analytics-platform/2-merge-csvs.ts (2)

166-173: LGTM — basePrefix and patientId parsing updated to Bulk helpers.

This matches the new S3 layout and keeps trailing “/” handling consistent with listFirstLevelSubdirectories.


5-7: Bulk helper imports verified; no old helper references remain.

packages/core/src/command/consolidated/consolidated-create.ts (2)

22-22: Import alias/readability — LGTM

Renaming to getPharmacyResources aligns with surrounding naming. No concerns.


28-28: Confirm out-of-band analytics ingestion isn’t prematurely cut off
createConsolidatedFromConversions is awaited in:

  • packages/utils/src/fhir/consolidate-patient-data.ts (long-running processing)
  • packages/core/src/command/consolidated/consolidated-filter.ts & consolidated-create.ts (CLI contexts)

Since ingestPatientIntoAnalyticsPlatform is fire-and-forget, verify that these short-lived processes (CLI/Lambda) remain alive until the ingestion dispatch completes or convert the call to an awaited promise.

packages/data-transformation/fhir-to-csv/main.py (4)

36-36: LGTM: consolidated bundle key derivation is clearer

Using create_consolidated_key centralizes the path shape and reduces drift.


108-109: LGTM: explicit validation for OUTPUT_PREFIX

Validation mirrors other required envs.


118-119: LGTM: passing through the provided prefix

Once sanitized, this keeps upload paths deterministic.


34-35: Sanitize output_file_prefix: Added output_file_prefix = output_file_prefix.rstrip("/") at the start of transform_and_upload_data; verified only one internal caller in main.py passes the argument—no other updates needed.

packages/infra/lib/analytics-platform/analytics-platform-stack.ts (6)

24-27: Interface addition looks good

Adding fhirToCsvBulk to AnalyticsPlatformsSettings aligns with the new two-lambda split.


84-87: settings() return shape is consistent

Returning fhirToCsvBulk alongside fhirToCsvTransform and mergeCsvs matches the new surface.


159-170: Transform lambda wiring LGTM

DockerImageFunction for the transform path, alarms, and bucket bindings are correct.


254-257: Bucket permissions look right

Transform gets read of medicalDocuments and read/write of analyticsPlatform as expected.


373-376: Merge CSVs concurrency is fine given FIFO + single messageGroupId

Acknowledging prior learning: merge-csvs uses a fixed messageGroupId to enforce sequential processing; maxConcurrency: 20 here won’t violate ordering.


239-244: ENV var name is correct in fhir-to-csv transform
The transform code in main.py reads process.env.ENV, so using ENV (not ENV_TYPE) here aligns with its implementation.

packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts (4)

4-6: Bulk handler/type alignment — LGTM.

Imports and class rename correctly align to the bulk contract and interface.


18-20: Dedup/group by patientId — LGTM and matches prior decision.

Using patientId for messageDeduplicationId and messageGroupId enforces strict per-patient idempotency and ordering, consistent with the documented approach in the analytics pipeline.


8-10: Verified FHIR_TO_CSV_BULK_QUEUE_URL wiring and FIFO configurationapi-service.ts injects FHIR_TO_CSV_BULK_QUEUE_URL and analytics-platform-stack.ts’s createQueue({…, fifo: true,…}) ensures the queue is FIFO.


12-22: Ignore timeoutInMillis here: neither executeWithNetworkRetries nor SQSClient support a per-call timeout (no timeoutInMillis or equivalent option in ExecuteWithNetworkRetriesOptions or SQSParameters).

Likely an incorrect or invalid review comment.

Comment on lines +12 to 14
async processFhirToCsv(params: ProcessFhirToCsvBulkRequest): Promise<void> {
const { patientId } = params;
const payload = JSON.stringify(params);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against empty patientId (avoids SQS FIFO errors).

If patientId is empty/blank, SQS will reject messageGroupId/dedup IDs; fail fast.

Apply:

   async processFhirToCsv(params: ProcessFhirToCsvBulkRequest): Promise<void> {
-    const { patientId } = params;
+    const { patientId } = params;
+    if (!patientId?.trim()) {
+      throw new Error("patientId is required");
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async processFhirToCsv(params: ProcessFhirToCsvBulkRequest): Promise<void> {
const { patientId } = params;
const payload = JSON.stringify(params);
async processFhirToCsv(params: ProcessFhirToCsvBulkRequest): Promise<void> {
const { patientId } = params;
if (!patientId?.trim()) {
throw new Error("patientId is required");
}
const payload = JSON.stringify(params);
// …
}
🤖 Prompt for AI Agents
In
packages/core/src/command/analytics-platform/fhir-to-csv/command/fhir-to-csv-bulk/fhir-to-csv-bulk-cloud.ts
around lines 12–14, add a guard that validates params.patientId is present and
non-blank (e.g., check that typeof patientId === 'string' and
patientId.trim().length > 0) before JSON-stringifying payload or sending to SQS;
if the check fails, throw or return an error with a clear message (e.g.,
"Missing or empty patientId — cannot enqueue SQS FIFO message") so the function
fails fast and avoids SQS FIFO messageGroupId/deduplication errors.

readonly fhirToCsvLambda: lambda.DockerImageFunction;
readonly fhirToCsvQueue: Queue;
readonly fhirToCsvBulkLambda: lambda.DockerImageFunction;
readonly fhirToCsvBulkQueue: Queue;
readonly mergeCsvsLambda: lambda.DockerImageFunction;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mergeCsvs types: created via createLambda (Function), not DockerImageFunction

Both the class field and the setup method return type should be lambda.Function.

-  readonly mergeCsvsLambda: lambda.DockerImageFunction;
+  readonly mergeCsvsLambda: lambda.Function;
@@
-  }): {
-    mergeCsvsLambda: lambda.DockerImageFunction;
-    queue: Queue;
-  } {
+  }): {
+    mergeCsvsLambda: lambda.Function;
+    queue: Queue;
+  } {

Also applies to: 332-334

🤖 Prompt for AI Agents
In packages/infra/lib/analytics-platform/analytics-platform-stack.ts around
lines 101 and 332-334, the mergeCsvs lambda is typed as
lambda.DockerImageFunction but it is created via createLambda which returns a
lambda.Function; change the class field type and the return/type annotations in
the setup method to lambda.Function (and update any related variables or method
signatures accordingly), and ensure the aws-lambda import/typing matches
lambda.Function so the types align with the actual created Function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant