通过label-studio标柱FUNasr语音识别模型训练所需的数据

6天前学习44

label-studio的安装部署看我的以前的文章

开源模型训练数据标柱工具label-studio使用介绍 - 星光下的赶路人

一、标柱

安装完成后首先上传音频,选择语音识别任务

标注分几步

首先鼠标选中要标柱的音频段,选中后该部分会变成黑色

然后点击上方的标签,一般选择语音speech

选择标签后音频轨道选中部分会变声绿色,下面会出现输入框

在输入框中输入该段音频对应的文本内容,点击add提交即可

如果标柱错了或者选中的音频不对,想要删除这段标柱,可以点击右侧的删除按钮进行删除

如果标柱完一段后要继续听后续的音频可以按键盘上的esc或者再次点击这段选中的音频,下面的输入框隐藏后就可以继续标柱后续的音频了。

所有的文本都标柱了后点击update就保存了

 

二、转为FUNasr语音识别模型训练数据

安装完成以后,导出json文件。

将原始音频和导出的json文件放到一个目录下

运行以下代码

# -*- coding: utf-8 -*-
# @Time    : 2025/8/5 15:46
# @Author  : shilixin
# @File    : datazhunbei.py
import json
import os
from pydub import AudioSegment
import soundfile as sf

def ensure_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

def parse_json(json_data):
    parse_results = []
    for json_data_item in json_data:
        parse_result = {}
        annotations = json_data_item.get('annotations', [])

        file_upload = json_data_item.get('file_upload', [])
        parse_result['file'] = file_upload
        data_list = []
        for ann in annotations:
            result = ann.get('result', [])
            for res in result:
                if res.get('type') == 'textarea' and res.get('origin') == 'manual':
                    label_value = res.get('value', {})
                    start = label_value.get('start')
                    end = label_value.get('end')
                    text = label_value.get('text')[0]
                    id = res.get('id')
                    data_list.append({
                        'id': id,
                        'start': start,
                        'end': end,
                        'text': text
                    })
        parse_result['data_list'] = data_list
        parse_results.append(parse_result)
    return parse_results


def split_audio_and_generate_files(data_list, original_audio_path, txt_file_path,scp_file_path):
    # 加载原始音频文件
    try:
        audio = AudioSegment.from_wav(original_audio_path)
    except Exception as e:
        print(f"无法加载音频文件 {original_audio_path}: {e}")
        return

    txt_lines = []
    scp_lines = []

    for data in data_list:
        id_ = data['id']
        start_ms = data['start'] * 1000  # 转换为毫秒
        end_ms = data['end'] * 1000  # 转换为毫秒
        text = data['text']

        # 分割音频
        segment = audio[start_ms:end_ms]

        # 定义输出音频文件路径
        audio_filename = f"{id_}.wav"
        wavfiles = os.path.join(output_dir, 'wavfiles')
        ensure_directory_exists(wavfiles)
        audio_filepath = os.path.join(wavfiles, audio_filename)

        # 导出音频片段
        try:
            segment.export(audio_filepath, format="wav")
        except Exception as e:
            print(f"无法导出音频文件 {audio_filepath}: {e}")
            continue

        # 写入 TXT 文件
        txt_line = f"{id_}\t{text}\n"
        txt_lines.append(txt_line)

        # 写入 SCP 文件
        scp_line = f"{id_}\t{audio_filepath}\n"
        scp_lines.append(scp_line)

    # 保存 TXT 文件
    with open(txt_file_path, 'a', encoding='utf-8') as txt_file:
        txt_file.writelines(txt_lines)

    # 保存 SCP 文件
    with open(scp_file_path, 'a', encoding='utf-8') as scp_file:
        scp_file.writelines(scp_lines)

    print(f"处理完成。")

from pydub import AudioSegment
import os

def convert_wav_to_mono_16k(input_wav_path, output_wav_path=None):
    # 加载音频文件
    audio = AudioSegment.from_wav(input_wav_path)

    print(f"原始音频信息:")
    print(f"  声道数: {audio.channels}")
    print(f"  采样宽度: {audio.sample_width * 8} bit")
    print(f"  帧率 (采样率): {audio.frame_rate} Hz")

    # 转为单声道
    if audio.channels > 1:
        print("检测到多声道,正在转为单声道...")
        audio = audio.set_channels(1)
    else:
        print("音频已经是单声道,无需转换。")

    # 转为16kHz采样率
    target_sample_rate = 16000
    if audio.frame_rate != target_sample_rate:
        print(f"当前采样率为 {audio.frame_rate} Hz,正在转为 {target_sample_rate} Hz...")
        audio = audio.set_frame_rate(target_sample_rate)
    else:
        print(f"音频采样率已经是 {target_sample_rate} Hz,无需转换。")

    # 设置输出路径
    if output_wav_path is None:
        base, ext = os.path.splitext(input_wav_path)
        output_wav_path = f"{base}_converted{ext}"

    # 导出为wav格式,确保是16bit PCM
    audio.export(output_wav_path, format="wav", parameters=["-ac", "1", "-ar", "16000", "-sample_fmt", "s16"])

    print(f"转换完成!文件已保存至: {output_wav_path}")
    return output_wav_path


import random

def split_train_validation(txt_path, scp_path,val_txt_path,val_scp_path, val_ratio=0.2):
    # 读取原始文件内容
    with open(txt_path, 'r', encoding='utf-8') as f:
        txt_lines = f.readlines()
    
    with open(scp_path, 'r', encoding='utf-8') as f:
        scp_lines = f.readlines()
    
    # 确保两个文件行数相同
    if len(txt_lines) != len(scp_lines):
        raise ValueError("TXT文件和SCP文件的行数不一致")
    
    # 生成随机索引,抽取20%作为验证集
    total_lines = len(txt_lines)
    print('总行数',total_lines)
    val_size = int(total_lines * val_ratio)
    indices = list(range(total_lines))
    random.shuffle(indices)
    val_indices = set(indices[:val_size])
    
    # 分离训练集和验证集
    train_txt, val_txt = [], []
    train_scp, val_scp = [], []
    
    for i in range(total_lines):
        if i in val_indices:
            val_txt.append(txt_lines[i])
            val_scp.append(scp_lines[i])
        else:
            train_txt.append(txt_lines[i])
            train_scp.append(scp_lines[i])
    
    # 写入验证集文件
    with open(val_txt_path, 'w', encoding='utf-8') as f:
        f.writelines(val_txt)
    
    with open(val_scp_path, 'w', encoding='utf-8') as f:
        f.writelines(val_scp)
    
    # 写回训练集(覆盖原始文件,只保留80%的数据)
    with open(txt_path, 'w', encoding='utf-8') as f:
        f.writelines(train_txt)
    
    with open(scp_path, 'w', encoding='utf-8') as f:
        f.writelines(train_scp)
    
    print(f"数据拆分完成:")
    print(f"总数据量:{total_lines} 行")
    print(f"训练集:{len(train_txt)} 行")
    print(f"验证集:{len(val_txt)} 行")


if __name__ == "__main__":
    # JSON 文件路径(请根据实际情况修改)
    json_file_path = 'data/data.json'  # 例如: 'data.json'

    wav_file_path = 'data'
    
    # 读取 JSON 文件
    with open(json_file_path, 'r', encoding='utf-8') as f:
        json_data = json.load(f)

    # 解析数据
    data_list = parse_json(json_data)

    # 输出目录(请根据实际情况修改)
    output_dir = 'traindata'  # 例如: 'split_audio_files'

    # 确保输出目录存在
    ensure_directory_exists(output_dir)
    txt_file_path = os.path.join(output_dir, 'train.txt')
    scp_file_path = os.path.join(output_dir, 'train.scp')
    val_txt_path = os.path.join(output_dir, 'val.txt')
    val_scp_path = os.path.join(output_dir, 'val.scp')

    for data_item in data_list:
        original_audio_path = os.path.join(wav_file_path,data_item['file'])
        bz_data = data_item['data_list']
        original_audio_path = convert_wav_to_mono_16k(original_audio_path)

        speech, sample_rate = sf.read(original_audio_path)
        if speech.ndim == 1:
            print("单声道")
        elif speech.ndim == 2:
            print(f"多声道,通道数:{speech.shape[1]}")
        print('采样率',sample_rate)

        split_audio_and_generate_files(bz_data, original_audio_path, txt_file_path,scp_file_path)

    split_train_validation(txt_file_path, scp_file_path,val_txt_path,val_scp_path)

运行后会在当前目录下生成一个traindata文件夹

文件夹下有以下训练所需的文件,包括训练所需的音频在wavfiles文件夹下。

修改训练脚本就可以开始训练了。

扫描二维码推送至手机访问。

版权声明:本文由星光下的赶路人发布,如需转载请注明出处。

本文链接:https://forstyle.cc/zblog/post/92.html

分享给朋友: