-
Notifications
You must be signed in to change notification settings - Fork 585
feature: tighten run_tasks_base #730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Please make sure all the checkboxes are checked:
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes introduce new environment variables for default user configuration and a new workflow migration process. Several configuration modules received updates including new methods and attributes for migration and LLM prompt handling. A new asynchronous task execution framework was added to refactor pipeline flows, while the search functionality now uses an updated type mapping. Additionally, multiple new evaluation pipelines for HotpotQA were implemented and obsolete files and notebooks were removed. Minor adjustments to logging, error handling, and test cases were also applied. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant RTB as run_tasks_base
participant HT as handle_task
participant GT as get_task_type
participant GE as get_task_executor
participant Exec as Executor (async/gen/coroutine/func)
Caller->>RTB: Invoke run_tasks_base(tasks, data, user)
RTB->>HT: For each task, call handle_task(task, args, leftover, batch_size, user)
HT->>GT: Determine task type
GT-->>HT: Return task type (e.g. async, generator, etc.)
HT->>GE: Fetch corresponding executor function
GE-->>HT: Return executor for task type
HT->>Exec: Execute task using executor function
Exec-->>HT: Yield results/errors
HT-->>RTB: Return task result
RTB-->>Caller: Return aggregated results
sequenceDiagram
autonumber
participant User
participant SS as specific_search
participant CR as CompletionRetriever
User->>SS: Submit search query with type "RAG_COMPLETION"
SS->>SS: Map RAG_COMPLETION to CompletionRetriever
SS->>CR: Invoke retrieval method
CR-->>SS: Return search results
SS-->>User: Deliver results
Possibly related PRs
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 9
🧹 Nitpick comments (38)
cognee/infrastructure/llm/prompts/answer_simple_question_benchmark4.txt (1)
1-15
: Review of Prompt Instructions DocumentThe new file clearly documents the standardized protocol for generating concise, context-based answers. The prompt instructions are well-structured, detailing:
- Minimalism: (Line 3) advising brevity.
- Question-Specific Responses: (Lines 4–8) with explicit instructions for handling yes/no, what/who/where, when, and how/why questions.
- Formatting Rules: (Lines 9–11) enforcing no punctuation and that responses be in lowercase.
- Context-Only Responses: (Line 12) ensuring that answers are derived solely from the provided context.
- Overall Protocol Meaning: (Line 14) summarizing the design goal for direct communication.
Suggestions:
- Consider adding a short header or file description at the very top (before line 1) to clarify the file’s purpose for future maintainers.
- Optionally remove or repurpose extra blank lines (such as after line 2) to enhance compactness and consistency with other benchmark prompt files.
Overall, the document is consistent and clear in its intent.
cognee/infrastructure/llm/prompts/answer_simple_question_benchmark2.txt (1)
2-7
: Bullet Instruction Clarity and Style SuggestionThe bullet points effectively specify the response guidelines for various question types. However, the repeated use of a similar structure (each bullet beginning with "- For") may trigger style warnings and could benefit from slight rewordings to enhance readability. Consider varying the sentence starters in some bullets to avoid stylistic repetition.
🧰 Tools
🪛 LanguageTool
[style] ~5-~5: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...y with a single word or brief phrase. - For when questions: return only the relevan...(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
[style] ~6-~6: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...: return only the relevant date/time. - For how/why questions: use the briefest phr...(ENGLISH_WORD_REPEAT_BEGINNING_RULE)
cognee/infrastructure/llm/prompts/generate_graph_prompt_guided.txt (2)
10-15
: Node Label Consistency – Wording Improvement
The guideline on label consistency is clear; however, the phrase on line 14 ("in a case of multiple words separated by whitespaces") could be rephrased for clarity. For example: "Each entity type should be singular, and if it consists of multiple words, they should be separated by a single space."
72-74
: Inferred Facts Guidelines – Clarity Enhancement
The guideline on extracting inferred facts is valuable for enhancing the knowledge graph. To avoid ambiguity, consider adding a brief example or additional context on what qualifies as an inferred fact.cognee/infrastructure/llm/prompts/generate_graph_prompt_strict.txt (2)
31-49
: Clear Guidance on Dates, Numbers, and PropertiesThe sections on date formatting and numerical properties are detailed and unambiguous. The emphasis on using the "YYYY-MM-DD" format for dates and enforcing snake_case for properties adds necessary rigor.
Suggestion: Including an explicit example of a key-value extraction for a numerical value might further aid implementers.
89-89
: Annotation ConsistencyLine 89 is missing the tilde (
~
) annotation that is consistently used on the other added lines. Please update this line's annotation to maintain consistency across the file.cognee/tasks/graph/extract_graph_from_data.py (1)
2-2
: Unused import detected.The
Optional
type is imported but not used anywhere in this file. This creates unnecessary clutter.-from typing import Type, List, Optional +from typing import Type, List🧰 Tools
🪛 Ruff (0.8.2)
2-2:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
cognee/infrastructure/llm/prompts/generate_graph_prompt_oneshot.txt (3)
30-32
: Minor grammatical improvement opportunity.Consider revising the wording for clarity:
- - Always use full, canonical names. + - Always use complete, canonical names.🧰 Tools
🪛 LanguageTool
[grammar] ~30-~30: Did you mean the adjective “useful”?
Context: ...ived directly from the text. - Always use full, canonical names. - Do not use in...(THANK_FULL)
83-89
: Date format consistency.The date example includes a missing comma after the year which is inconsistent with standard format:
-> **Input**: "Google was founded on September 4, 1998 and has a market cap of 800000000000." +> **Input**: "Google was founded on September 4, 1998, and has a market cap of 800000000000."🧰 Tools
🪛 LanguageTool
[style] ~83-~83: Some style guides suggest that commas should set off the year in a month-day-year date.
Context: ...**: "Google was founded on September 4, 1998 and has a market cap of 800000000000." ...(MISSING_COMMA_AFTER_YEAR)
127-132
: Wording simplification opportunity.The phrase "absolutely essential" could be simplified for clarity:
-- **Rule**: Avoid vague or empty edges (e.g., "X is a concept") unless absolutely essential. +- **Rule**: Avoid vague or empty edges (e.g., "X is a concept") unless essential.🧰 Tools
🪛 LanguageTool
[style] ~127-~127: ‘absolutely essential’ might be wordy. Consider a shorter alternative.
Context: ...y edges (e.g., "X is a concept") unless absolutely essential. ### 4.3 Inferred Facts - Rule: On...(EN_WORDINESS_PREMIUM_ABSOLUTELY_ESSENTIAL)
cognee/tasks/ingestion/migrate_relational_database.py (1)
148-168
: Good implementation of duplicate edge removal.The
_remove_duplicate_edges
function efficiently eliminates duplicate edges using a set to track unique edges. The implementation correctly handles the dictionary attributes by converting them to a hashable representation.Consider adding logging to track how many duplicates were removed:
def _remove_duplicate_edges(edge_mapping): seen = set() unique_original_shape = [] for tup in edge_mapping: # We go through all the tuples in the edge_mapping and we only add unique tuples to the list # To eliminate duplicate edges. source_id, target_id, rel_name, rel_dict = tup # We need to convert the dictionary to a frozenset to be able to compare values for it rel_dict_hashable = frozenset(sorted(rel_dict.items())) hashable_tup = (source_id, target_id, rel_name, rel_dict_hashable) # We use the seen set to keep track of unique edges if hashable_tup not in seen: # A list that has frozensets elements instead of dictionaries is needed to be able to compare values seen.add(hashable_tup) # append the original tuple shape (with the dictionary) if it's the first time we see it unique_original_shape.append(tup) + duplicates_removed = len(edge_mapping) - len(unique_original_shape) + if duplicates_removed > 0: + logger.info(f"Removed {duplicates_removed} duplicate edges") return unique_original_shapecognee/base_config.py (1)
17-18
: Ensure secure handling of default user credentials.Storing user credentials in environment variables is generally acceptable, but make sure they aren’t logged or exposed inadvertently. For enhanced security, consider using a secrets manager or vault-based approach.
cognee/modules/data/extraction/knowledge_graph/extract_content_graph.py (1)
1-1
: Remove unused import.The
Optional
import fromtyping
is not being used in this file.-from typing import Type, Optional +from typing import Type🧰 Tools
🪛 Ruff (0.8.2)
1-1:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
cognee/api/v1/config/config.py (1)
134-147
: Fix documentation in docstring for migration config method.The docstring incorrectly refers to "relational db config" instead of "migration db config".
- Updates the relational db config with values from config_dict. + Updates the migration db config with values from config_dict.Otherwise, the implementation correctly follows the pattern used in other configuration methods, which provides consistency in the codebase.
cognee/eval_framework/corpus_builder/run_corpus_builder.py (1)
3-3
: Remove unused importThe
Optional
type is imported but not used in the code.-from typing import List, Optional +from typing import List🧰 Tools
🪛 Ruff (0.8.2)
3-3:
typing.Optional
imported but unusedRemove unused import:
typing.Optional
(F401)
cognee/infrastructure/databases/relational/config.py (2)
38-38
: Consider usingOptional[str]
for readability.Instead of
Union[str, None]
, you may preferOptional[str]
fromtyping
to make it more apparent thatNone
is allowed and to align with typical Python conventions.- migration_db_name: Union[str, None] = None + from typing import Optional + migration_db_name: Optional[str] = None
43-43
: Apply the same convention formigration_db_provider
.Similar to the suggestion above, opt for
Optional[str]
.- migration_db_provider: Union[str, None] = None + migration_db_provider: Optional[str] = Nonecognee/tests/test_cognee_server_start.py (2)
10-28
: Enhance server startup reliability.Relying on a fixed 20-second
time.sleep
may cause flakiness on slower machines or waste time on faster ones. Consider a loop to check readiness (e.g., repeatedly try connecting until the server responds or a timeout is reached).
38-44
: Graceful teardown approach is acceptable.Using
os.killpg
withSIGTERM
is typically safe. Just ensure the parent process group logic works properly on non-POSIX systems. In smaller test environments, a simpleterminate()
might suffice, but this approach is fine if you tested it cross-platform..github/README_WORKFLOW_MIGRATION.md (1)
30-92
: Good manual migration guide.The step-by-step approach to switch
on:
triggers toworkflow_call:
is straightforward. Consider adding a note on how to reference the newtest-suites.yml
file, if it’s in a non-obvious location.cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py (1)
128-151
: Potential performance concern in per-data-point lookups.The loop executes a
SELECT
for each data point to determine if it needs an update or insert. This may be inefficient for large batches. Consider a bulk approach (e.g., a singleSELECT
for all IDs, building a map, then updating or inserting accordingly) to reduce database roundtrips.evals/README.md (3)
66-69
: Remove unnecessary blank lines.There are several empty lines before the "Human Evaluation" section that disrupt the document flow. Consider removing them to maintain consistent spacing throughout the document.
- - - #### Human Evaluation
85-85
: Fix grammar in the limitations list.The sentence has a grammar issue that affects readability.
- LLM as a judge metrics are not reliable measure and can indicate the overall accuracy + LLM as a judge metrics are not a reliable measure and can only approximate the overall accuracy
87-87
: Hyphenate the compound adjective "labor-intensive".According to English style conventions, compound adjectives should be hyphenated when they precede a noun.
- Human as a judge is labor intensive and does not scale + Human as a judge is labor-intensive and does not scale🧰 Tools
🪛 LanguageTool
[misspelling] ~87-~87: This word is normally spelled with a hyphen.
Context: ...memory evaluation - Human as a judge is labor intensive and does not scale - Hotpot is not the ...(EN_COMPOUNDS_LABOR_INTENSIVE)
cognee/tests/test_relational_db_migration.py (2)
111-111
: Rename unused loop variable.The loop control variable
edge_data
is not used within the loop body, which is flagged by static analysis.- for src, tgt, key, edge_data in edges: + for src, tgt, key, _edge_data in edges:🧰 Tools
🪛 Ruff (0.8.2)
111-111: Loop control variable
edge_data
not used within loop bodyRename unused
edge_data
to_edge_data
(B007)
165-169
: Replace magic numbers with named constants.The assertions use hardcoded numbers for expected node and edge counts, which makes the tests brittle and harder to maintain.
+ # Define expected counts at the top of the file or in a configuration + SQLITE_EXPECTED_NODE_COUNT = 227 + SQLITE_EXPECTED_EDGE_COUNT = 580 + POSTGRES_EXPECTED_NODE_COUNT = 115 + POSTGRES_EXPECTED_EDGE_COUNT = 356 # Then use these constants in the assertions - assert node_count == 227, f"Expected 227 nodes, got {node_count}" - assert edge_count == 580, f"Expected 580 edges, got {edge_count}" + assert node_count == SQLITE_EXPECTED_NODE_COUNT, f"Expected {SQLITE_EXPECTED_NODE_COUNT} nodes, got {node_count}" + assert edge_count == SQLITE_EXPECTED_EDGE_COUNT, f"Expected {SQLITE_EXPECTED_EDGE_COUNT} edges, got {edge_count}" # And later for PostgreSQL - assert node_count == 115, f"Expected 115 nodes, got {node_count}" - assert edge_count == 356, f"Expected 356 edges, got {edge_count}" + assert node_count == POSTGRES_EXPECTED_NODE_COUNT, f"Expected {POSTGRES_EXPECTED_NODE_COUNT} nodes, got {node_count}" + assert edge_count == POSTGRES_EXPECTED_EDGE_COUNT, f"Expected {POSTGRES_EXPECTED_EDGE_COUNT} edges, got {edge_count}"Also applies to: 196-200
evals/mem0_01042025/hotpot_qa_mem0.py (2)
28-28
: Rename unused loop variable.The loop control variable
i
is not used within the loop body.- for i, document in enumerate(tqdm(corpus, desc="Adding documents")): + for _, document in enumerate(tqdm(corpus, desc="Adding documents")):🧰 Tools
🪛 Ruff (0.8.2)
28-28: Loop control variable
i
not used within loop bodyRename unused
i
to_i
(B007)
71-72
: Consider handling empty search results.The code doesn't check if the memory search returns empty results before trying to format them.
- relevant_memories = memory.search(query=question, user_id=user_id, limit=5) - memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories["results"]) + relevant_memories = memory.search(query=question, user_id=user_id, limit=5) + if not relevant_memories.get("results"): + memories_str = "No relevant context found." + else: + memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories["results"])evals/plot_metrics.py (2)
9-20
: Robust confidence interval calculation.This method correctly uses
stats.t.interval
for computing the confidence interval. Consider handling edge cases likeNaN
values or infinities inaccuracies
if your data source can produce malformed values.
62-175
: Extensive logic for loading specialized metrics.The conditional checks for system-specific metrics (Graphiti, Mem0, Cognee) are well-structured. However, the nested conditions are somewhat verbose. You might consider centralizing the logic in a mapping dict to reduce repetitive code.
Additionally, lines 210-210 iterate over
all_systems_metrics.keys()
. This could be streamlined:-for system in all_systems_metrics.keys(): +for system in all_systems_metrics:cognee/eval_framework/corpus_builder/task_getters/get_default_tasks_by_indices.py (2)
33-46
: Combining base tasks with graph-related tasks.Returning
[graph_task, add_data_points_task]
in addition to the base tasks is straightforward and clear. Ifuser
is essential to these tasks, confirm its usage or future usage in subsequent references.
49-58
: Core chunking tasks plus data points addition.Similar to
get_no_summary_tasks
,user
is introduced but not used within the function body. If it’s truly optional, consider documenting that in the docstring for clarity.evals/falkor_01042025/hotpot_qa_falkor_graphrag_sdk.py (2)
1-9
: Unused import detected.
URL
fromgraphrag_sdk.source
is never used (line 5). Consider removing it to keep imports clean:- from graphrag_sdk.source import URL, STRING + from graphrag_sdk.source import STRING🧰 Tools
🪛 Ruff (0.8.2)
5-5:
graphrag_sdk.source.URL
imported but unusedRemove unused import:
graphrag_sdk.source.URL
(F401)
35-64
: Ontology creation logic seems solid.The function creates an ontology from source data and writes it to JSON. Good job providing print statements for user awareness. Consider robust error handling if the corpus file has unexpected structure.
evals/graphiti_01042025/hotpot_qa_graphiti.py (3)
9-9
: Remove unused importThe
from openai import OpenAI
import is never referenced. Removing it will keep the code clean and avoid confusion.- from openai import OpenAI
🧰 Tools
🪛 Ruff (0.8.2)
9-9:
openai.OpenAI
imported but unusedRemove unused import:
openai.OpenAI
(F401)
23-24
: Add robust file handlingReading JSON files without error handling can lead to runtime failures if files are missing or malformed. Safely handle file I/O and JSON parsing errors to improve resilience.
try: with open(corpus_file, "r", encoding="utf-8") as file: corpus = json.load(file) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Error loading {corpus_file}: {e}") returnAlso applies to: 61-62, 118-119
94-95
: Handle potential LLM call failuresFor calls to
llm.ainvoke
, consider adding retries or exception handling to manage rate limits, network errors, or unexpected responses from the LLM.cognee/modules/pipelines/operations/run_tasks_base.py (1)
18-42
: Check for potential recursion depth with large tasksThe approach of yielding results via
run_tasks_base
within an async generator is elegant. However, if there are many leftover tasks, repeated calls could accumulate recursive iterations. Ensure that the sequence of tasks is finite and that you won't exceed recursion limits or have performance bottlenecks.
🛑 Comments failed to post (9)
cognee/shared/logging_utils.py (1)
216-217:
⚠️ Potential issueCritical syntax error in hasattr check.
The
__name__
reference is incorrect - it's trying to use the value of__name__
variable rather than checking for the string literal"__name__"
. This will cause a NameError at runtime.- if hasattr(exc_type, __name__): + if hasattr(exc_type, "__name__"): event_dict["exception_type"] = exc_type.__name__📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.if hasattr(exc_type, "__name__"): event_dict["exception_type"] = exc_type.__name__
.github/workflows/disable_independent_workflows.sh (1)
4-4:
⚠️ Potential issueAdd error handling to directory change.
The
cd
command should include error handling to prevent the script from continuing execution in the wrong directory if the command fails.-cd "$(dirname "$0")" +cd "$(dirname "$0")" || exit 1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.cd "$(dirname "$0")" || exit 1
🧰 Tools
🪛 Shellcheck (0.10.0)
[warning] 4-4: Use 'cd ... || exit' or 'cd ... || return' in case cd fails.
(SC2164)
cognee/tests/test_telemetry.py (1)
44-44:
⚠️ Potential issueFix misleading comment about environment setting.
The comment suggests setting the environment to "dev" but the code sets it to "prod", which is contradictory.
- os.environ["ENV"] = "prod" # Set to dev to ensure telemetry is sent + os.environ["ENV"] = "prod" # Set to prod to ensure telemetry is sent📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.os.environ["ENV"] = "prod" # Set to prod to ensure telemetry is sent
cognee/tests/test_relational_db_migration.py (1)
221-221: 🛠️ Refactor suggestion
Add more detailed instructions for PostgreSQL test setup.
The comment mentions running a SQL script but lacks specifics on where to find it or how to run it.
- # To run test manually you first need to run the Chinook_PostgreSql.sql script in the test_data directory + # To run this test manually: + # 1. Ensure PostgreSQL is running and accessible with the credentials specified below + # 2. Run the following command from the project root: + # psql -U cognee -d test_migration_db -h 127.0.0.1 -f cognee/tests/test_data/Chinook_PostgreSql.sql📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.# To run this test manually: # 1. Ensure PostgreSQL is running and accessible with the credentials specified below # 2. Run the following command from the project root: # psql -U cognee -d test_migration_db -h 127.0.0.1 -f cognee/tests/test_data/Chinook_PostgreSql.sql
evals/mem0_01042025/hotpot_qa_mem0.py (4)
53-54: 🛠️ Refactor suggestion
Add error handling for QA pairs file.
Similar to the corpus loading, there's no error handling for loading the QA pairs file.
- with open(qa_pairs_file, "r") as file: - qa_pairs = json.load(file) + try: + with open(qa_pairs_file, "r") as file: + qa_pairs = json.load(file) + except FileNotFoundError: + print(f"Error: QA pairs file '{qa_pairs_file}' not found.") + return [] + except json.JSONDecodeError: + print(f"Error: QA pairs file '{qa_pairs_file}' contains invalid JSON.") + return []📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.try: with open(qa_pairs_file, "r") as file: qa_pairs = json.load(file) except FileNotFoundError: print(f"Error: QA pairs file '{qa_pairs_file}' not found.") return [] except json.JSONDecodeError: print(f"Error: QA pairs file '{qa_pairs_file}' contains invalid JSON.") return []
111-136: 🛠️ Refactor suggestion
Add error handling in main function.
The main function should handle potential exceptions from the pipeline to prevent crashes.
- def main(config): - """Main function for HotpotQA memory pipeline.""" - print("Starting HotpotQA memory pipeline...") - print(f"Configuration: {config}") - - # Initialize clients - memory = Memory() - openai_client = OpenAI() - - # Load corpus to memory - load_corpus_to_memory( - memory=memory, - corpus_file=config.corpus_file, - user_id=config.user_id, - limit=config.corpus_limit, - ) - - # Answer questions - print(f"Answering questions from {config.qa_pairs_file}...") - answer_questions( - memory=memory, - openai_client=openai_client, - model_name=config.model_name, - user_id=config.user_id, - qa_pairs_file=config.qa_pairs_file, - print_results=config.print_results, - output_file=config.results_file, - limit=config.qa_limit, - ) - - print(f"Results saved to {config.results_file}") - print("Pipeline completed successfully") + def main(config): + """Main function for HotpotQA memory pipeline.""" + try: + print("Starting HotpotQA memory pipeline...") + print(f"Configuration: {config}") + + # Initialize clients + memory = Memory() + openai_client = OpenAI() + + # Load corpus to memory + load_corpus_to_memory( + memory=memory, + corpus_file=config.corpus_file, + user_id=config.user_id, + limit=config.corpus_limit, + ) + + # Answer questions + print(f"Answering questions from {config.qa_pairs_file}...") + answer_questions( + memory=memory, + openai_client=openai_client, + model_name=config.model_name, + user_id=config.user_id, + qa_pairs_file=config.qa_pairs_file, + print_results=config.print_results, + output_file=config.results_file, + limit=config.qa_limit, + ) + + print(f"Results saved to {config.results_file}") + print("Pipeline completed successfully") + except Exception as e: + print(f"Error in pipeline execution: {e}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def main(config): """Main function for HotpotQA memory pipeline.""" try: print("Starting HotpotQA memory pipeline...") print(f"Configuration: {config}") # Initialize clients memory = Memory() openai_client = OpenAI() # Load corpus to memory load_corpus_to_memory( memory=memory, corpus_file=config.corpus_file, user_id=config.user_id, limit=config.corpus_limit, ) # Answer questions print(f"Answering questions from {config.qa_pairs_file}...") answer_questions( memory=memory, openai_client=openai_client, model_name=config.model_name, user_id=config.user_id, qa_pairs_file=config.qa_pairs_file, print_results=config.print_results, output_file=config.results_file, limit=config.qa_limit, ) print(f"Results saved to {config.results_file}") print("Pipeline completed successfully") except Exception as e: print(f"Error in pipeline execution: {e}")
18-38: 🛠️ Refactor suggestion
Add error handling for file operations.
The function lacks error handling for file operations, which could fail if the corpus file doesn't exist or has invalid JSON.
- with open(corpus_file, "r") as file: - corpus = json.load(file) + try: + with open(corpus_file, "r") as file: + corpus = json.load(file) + except FileNotFoundError: + print(f"Error: Corpus file '{corpus_file}' not found.") + return memory + except json.JSONDecodeError: + print(f"Error: Corpus file '{corpus_file}' contains invalid JSON.") + return memory📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.print(f"Loading corpus from {corpus_file}...") try: with open(corpus_file, "r") as file: corpus = json.load(file) except FileNotFoundError: print(f"Error: Corpus file '{corpus_file}' not found.") return memory except json.JSONDecodeError: print(f"Error: Corpus file '{corpus_file}' contains invalid JSON.") return memory # Apply limit if specified if limit is not None: corpus = corpus[:limit] print(f"Limited to first {limit} documents") print(f"Adding {len(corpus)} documents to memory...") for i, document in enumerate(tqdm(corpus, desc="Adding documents")): # Create a conversation that includes the document content messages = [ {"role": "system", "content": "This is a document to remember."}, {"role": "user", "content": "Please remember this document."}, {"role": "assistant", "content": document}, ] memory.add(messages, user_id=user_id) print("All documents added to memory") return memory
🧰 Tools
🪛 Ruff (0.8.2)
28-28: Loop control variable
i
not used within loop bodyRename unused
i
to_i
(B007)
98-100: 🛠️ Refactor suggestion
Add error handling for writing results.
The code should handle potential errors when writing to the output file.
- with open(output_file, "w", encoding="utf-8") as file: - json.dump(results, file, indent=2) + try: + with open(output_file, "w", encoding="utf-8") as file: + json.dump(results, file, indent=2) + except IOError as e: + print(f"Error: Failed to write results to '{output_file}': {e}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.print(f"Saving results to {output_file}...") try: with open(output_file, "w", encoding="utf-8") as file: json.dump(results, file, indent=2) except IOError as e: print(f"Error: Failed to write results to '{output_file}': {e}")
cognee/modules/pipelines/operations/run_tasks_base.py (1)
117-169:
⚠️ Potential issueGuard against a potential
None
userIn
handle_task
, the code sends telemetry usinguser.id
, but there is no check ifuser
isNone
. Ifuser
is optional, referencinguser.id
will cause errors. Consider a safeguard or a fallback mechanism.- user_id=user.id, + user_id=user.id if user else None,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.async def handle_task( running_task: Task, args: list, leftover_tasks: list[Task], next_task_batch_size: int, user: User, ): """Handle common task workflow with logging, telemetry, and error handling around the core execution logic.""" # Get task information using the helper functions task_type = get_task_type(running_task) executor = get_task_executor(task_type) # Determine executor args based on task type execute_args = (args, leftover_tasks) if task_type in ["Async Generator", "Generator"]: execute_args += (next_task_batch_size,) logger.info(f"{task_type} task started: `{running_task.executable.__name__}`") send_telemetry( f"{task_type} Task Started", user_id=user.id if user else None, additional_properties={ "task_name": running_task.executable.__name__, }, ) try: # Add user to the execute args complete_args = execute_args + (user,) async for result in executor(running_task, *complete_args): yield result logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`") send_telemetry( f"{task_type} Task Completed", user_id=user.id if user else None, additional_properties={ "task_name": running_task.executable.__name__, }, ) except Exception as error: logger.error( f"{task_type} task errored: `{running_task.executable.__name__}`\n{str(error)}\n", exc_info=True, ) send_telemetry( f"{task_type} Task Errored", user_id=user.id if user else None, additional_properties={ "task_name": running_task.executable.__name__, }, ) raise error
I also fixed a little bug with the TaskGetters. |
Great job @lxobr ! |
Description
DCO Affirmation
I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.