Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions .github/workflows/e2e_eval_aime24.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: e2e_eval_aime24

on:
# Trigger the workflow on push or pull request,
# but only for the main branch
push:
branches:
- main
- v0.2.x
paths:
- "**/*.py"
- .github/workflows/e2e_eval_aime24.yml
pull_request:
branches:
- main
- v0.2.x
paths:
- "**/*.py"
- "verl/trainer/config/*.yaml"
- .github/workflows/e2e_eval_aime24.yml
- "tests/e2e/*.sh"

# Declare permissions just read content.
permissions:
contents: read

jobs:
e2e_eval_aime24:
runs-on: [self-hosted, l20-1]
timeout-minutes: 40 # Increase this timeout value as needed
env:
HTTP_PROXY: ${{ secrets.PROXY_HTTP }}
HTTPS_PROXY: ${{ secrets.PROXY_HTTPS }}
NO_PROXY: "localhost,127.0.0.1"
HF_HUB_ENABLE_HF_TRANSFER: 1
container:
image: hiyouga/verl:ngc-th2.6.0-cu120-vllm0.8.2
options: --gpus all --shm-size=10g
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Install the current repository
run: |
pip3 install hf_transfer
pip3 install -e .[test,gpu,math]
- name: Prepare aime24 dataset
run: |
ray stop --force
python3 recipe/r1/data_process.py --task aime2024
- name: Running generation and evaluation in aime2024
run: |
ray stop --force
bash tests/e2e/run_r1_distill_qwen_aime24_eval.sh
13 changes: 13 additions & 0 deletions recipe/r1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
10 changes: 10 additions & 0 deletions recipe/r1/config/evaluation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
data:
path: /tmp/math_Qwen2-7B-Instruct.parquet
prompt_key: prompt
response_key: responses
data_source_key: data_source
reward_model_key: reward_model

custom_reward_function:
path: null
name: compute_score
210 changes: 210 additions & 0 deletions recipe/r1/data_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Preprocess the dataset to parquet format
"""

import os
from datasets import load_dataset, concatenate_datasets
from functools import partial

from verl.utils.hdfs_io import copy, makedirs
import argparse


def example_map_fn(example, idx, process_fn, data_source, ability, split):
question, solution = process_fn(example)
data = {
"data_source": data_source,
"prompt": [{
"role": "user",
"content": question
}],
"ability": ability,
"reward_model": {
"style": "rule",
"ground_truth": solution
},
"extra_info": {
'split': split,
'index': idx
}
}
return data


def build_aime2024_dataset():

def process_aime2024(example):
return example["Problem"], str(example["Answer"])

data_source = 'Maxwell-Jia/AIME_2024'
print(f"Loading the {data_source} dataset from huggingface...", flush=True)
dataset = load_dataset(data_source, split="train")
map_fn = partial(example_map_fn,
process_fn=process_aime2024,
data_source=data_source,
ability="English",
split="test")
dataset = dataset.map(map_fn, with_indices=True, remove_columns=dataset.column_names)
return dataset


def build_gpqa_dimond_dataset():
import random
GPQA_QUERY_TEMPLATE = "Answer the following multiple choice question. The last line of your response should be of the following format: 'Answer: $LETTER' (without quotes) where LETTER is one of ABCD. Think step by step before answering.\n\n{Question}\n\nA) {A}\nB) {B}\nC) {C}\nD) {D}"

def process_gpqa_diamond(example):
choices = [example["Incorrect Answer 1"], example["Incorrect Answer 2"], example["Incorrect Answer 3"]]
random.shuffle(choices)
gold_index = random.randint(0, 3)
choices.insert(gold_index, example["Correct Answer"])
query_prompt = GPQA_QUERY_TEMPLATE.format(A=choices[0],
B=choices[1],
C=choices[2],
D=choices[3],
Question=example["Question"])
gold_choice = "ABCD"[gold_index]
return query_prompt, gold_choice

data_source = 'Idavidrein/gpqa'
print(f"Loading the {data_source} dataset from huggingface...", flush=True)

dataset = load_dataset(data_source, "gpqa_diamond", split="train")
map_fn = partial(example_map_fn,
process_fn=process_gpqa_diamond,
data_source=data_source,
ability="Math",
split="test")
dataset = dataset.map(map_fn, with_indices=True, remove_columns=dataset.column_names)
return dataset


def build_cnmo2024_dataset():

def process_cnmo2024(example):
return example["question"], example["answer"]

data_source = 'opencompass/LiveMathBench'
print(f"Loading the {data_source} dataset from huggingface...", flush=True)

dataset_en = load_dataset(data_source, "v202412_CNMO_en", split="test")
map_fn_en = partial(example_map_fn,
process_fn=process_cnmo2024,
data_source='opencompass/cnmo2024_en',
ability="Math",
split="test")
dataset_en = dataset_en.map(map_fn_en, with_indices=True, remove_columns=dataset_en.column_names)

dataset_zh = load_dataset(data_source, "v202412_CNMO_cn", split="test")
map_fn_zh = partial(example_map_fn,
process_fn=process_cnmo2024,
data_source='opencompass/cnmo2024_zh',
ability="Math",
split="test")
dataset_zh = dataset_zh.map(map_fn_zh, with_indices=True, remove_columns=dataset_zh.column_names)

dataset = concatenate_datasets([dataset_en, dataset_zh])
return dataset


def build_livecodebench_dataset():
import json, pickle, zlib, base64

def process_livecodebench(example):
# Construct Query Prompt
# From https://github.com/LiveCodeBench/LiveCodeBench/blob/998c52d394b836f15fff3b9a29866191108ff81b/lcb_runner/prompts/code_generation.py#L140
query_prompt = (
"You will be given a question (problem specification) and will generate a correct Python program that matches the specification and passes all tests.\n\n"
f"Question: {example['question_content']}\n\n")
if example["starter_code"]:
query_prompt += (
"You will use the following starter code to write the solution to the problem and enclose your code within delimiters.\n"
f"```python\n{example['starter_code']}\n```")
else:
query_prompt += (
"Read the inputs from stdin solve the problem and write the answer to stdout (do not directly test on the sample inputs). Enclose your code within delimiters as follows. Ensure that when the python program runs, it reads the inputs, runs the algorithm and writes output to STDOUT."
f"```python\n# YOUR CODE HERE\n```")

# Construct test cases
public_test_cases = json.loads(example["public_test_cases"])
try:
private_test_cases = json.loads(example["private_test_cases"])
except:
private_test_cases = json.loads(
pickle.loads(zlib.decompress(base64.b64decode(example["private_test_cases"].encode("utf-8")))))
full_test_cases = public_test_cases + private_test_cases

metadata = json.loads(example["metadata"])
test_cases = {
"inputs": [t["input"] for t in full_test_cases],
"outputs": [t["output"] for t in full_test_cases],
"fn_name": metadata.get("func_name", None),
}
text_cases_compressed = base64.b64encode(zlib.compress(pickle.dumps(json.dumps(test_cases)))).decode("utf-8")
return query_prompt, text_cases_compressed

data_source = 'livecodebench/code_generation_lite'
print(f"Loading the {data_source} dataset from huggingface...", flush=True)
dataset = load_dataset(data_source, split="test")
# R1 Evaluation use LiveCodeBench 24.08-25.01
dataset = dataset.filter(lambda line: "2024-08-00T00:00:00" <= line["contest_date"] < "2025-01-00T00:00:00")
map_fn = partial(example_map_fn,
process_fn=process_livecodebench,
data_source=data_source,
ability="Code",
split="test")

dataset = dataset.map(map_fn, with_indices=True, remove_columns=dataset.column_names, num_proc=8)
return dataset


TASK2DATA = {
"aime2024": build_aime2024_dataset,
"gpqa_diamond": build_gpqa_dimond_dataset,
"cnmo2024": build_cnmo2024_dataset,
"livecodebench": build_livecodebench_dataset,
}
SUPPORTED_TASKS = TASK2DATA.keys()

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--local_dir', default='~/data/r1')
parser.add_argument('--hdfs_dir', default=None)
parser.add_argument('--tasks', default="all")

args = parser.parse_args()

if args.tasks.lower() == "all":
args.tasks = SUPPORTED_TASKS
else:
args.tasks = [task.strip() for task in args.tasks.split(',') if task.strip()]
for task in args.tasks:
if task not in SUPPORTED_TASKS:
raise NotImplementedError(f"{task} has not been supported.")

datasets = []
for task in args.tasks:
datasets.append(TASK2DATA[task]())
test_dataset = concatenate_datasets(datasets)

local_dir = args.local_dir
hdfs_dir = args.hdfs_dir

test_dataset.to_parquet(os.path.join(local_dir, 'test.parquet'))

if hdfs_dir is not None:
makedirs(hdfs_dir)

copy(src=local_dir, dst=hdfs_dir)
106 changes: 106 additions & 0 deletions recipe/r1/main_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Offline evaluate the performance of a generated file using reward model and ground truth verifier.
The input is a parquet file that contains N generated sequences and (optional) the ground truth.

"""

import hydra
from verl.utils.fs import copy_to_local
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import defaultdict
import ray


def get_custom_reward_fn(config):
import importlib.util, os

reward_fn_config = config.get("custom_reward_function") or {}
file_path = reward_fn_config.get("path")
if not file_path:
return None

if not os.path.exists(file_path):
raise FileNotFoundError(f"Reward function file '{file_path}' not found.")

spec = importlib.util.spec_from_file_location("custom_module", file_path)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except Exception as e:
raise RuntimeError(f"Error loading module from '{file_path}': {e}")

function_name = reward_fn_config.get("name")

if not hasattr(module, function_name):
raise AttributeError(f"Reward function '{function_name}' not found in '{file_path}'.")

print(f"using customized reward function '{function_name}' from '{file_path}'")

return getattr(module, function_name)


@ray.remote
def process_item(reward_fn, data_source, response_lst, reward_data):
ground_truth = reward_data['ground_truth']
score_lst = [reward_fn(data_source, r, ground_truth) for r in response_lst]
return data_source, np.mean(score_lst)


@hydra.main(config_path='config', config_name='evaluation', version_base=None)
def main(config):
local_path = copy_to_local(config.data.path)
dataset = pd.read_parquet(local_path)
prompts = dataset[config.data.prompt_key]
responses = dataset[config.data.response_key]
data_sources = dataset[config.data.data_source_key]
reward_model_data = dataset[config.data.reward_model_key]

total = len(dataset)

# Initialize Ray
if not ray.is_initialized():
ray.init()

# evaluate test_score based on data source
data_source_reward = defaultdict(list)
compute_score = get_custom_reward_fn(config)

# Create remote tasks
remote_tasks = [
process_item.remote(compute_score, data_sources[i], responses[i], reward_model_data[i]) for i in range(total)
]

# Process results as they come in
with tqdm(total=total) as pbar:
while len(remote_tasks) > 0:
# Use ray.wait to get completed tasks
done_ids, remote_tasks = ray.wait(remote_tasks)
for result_id in done_ids:
data_source, score = ray.get(result_id)
data_source_reward[data_source].append(score)
pbar.update(1)

metric_dict = {}
for data_source, rewards in data_source_reward.items():
metric_dict[f'test_score/{data_source}'] = np.mean(rewards)

print(metric_dict)


if __name__ == '__main__':
main()
Loading