vlm_agent/utils_asr.py

150 lines
5.0 KiB
Python
Raw Normal View History

2024-12-02 11:43:41 +08:00
# utils_asr.py
# 同济子豪兄 2024-5-22
# 录音+语音识别
print('导入录音+语音识别模块')
import pyaudio
import wave
import numpy as np
import os
import sys
from API_KEY import *
# 确定麦克风索引号
# import sounddevice as sd
# print(sd.query_devices())
def record(MIC_INDEX=0, DURATION=5):
'''
调用麦克风录音需用arecord -l命令获取麦克风ID
DURATION录音时长
'''
print('开始 {} 秒录音'.format(DURATION))
os.system('sudo arecord -D "plughw:{}" -f dat -c 1 -r 16000 -d {} temp/speech_record.wav'.format(MIC_INDEX, DURATION))
print('录音结束')
def record_auto(MIC_INDEX=1):
'''
开启麦克风录音保存至'temp/speech_record.wav'音频文件
音量超过阈值自动开始录音低于阈值一段时间后自动停止录音
MIC_INDEX麦克风设备索引号
'''
CHUNK = 1024 # 采样宽度
RATE = 16000 # 采样率
QUIET_DB = 2000 # 分贝阈值,大于则开始录音,否则结束
delay_time = 1 # 声音降至分贝阈值后,经过多长时间,自动终止录音
FORMAT = pyaudio.paInt16
CHANNELS = 1 if sys.platform == 'darwin' else 2 # 采样通道数
# 初始化录音
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=MIC_INDEX
)
frames = [] # 所有音频帧
flag = False # 是否已经开始录音
quiet_flag = False # 当前音量小于阈值
temp_time = 0 # 当前时间是第几帧
last_ok_time = 0 # 最后正常是第几帧
START_TIME = 0 # 开始录音是第几帧
END_TIME = 0 # 结束录音是第几帧
print('可以说话啦!')
while True:
# 获取当前chunk的声音
data = stream.read(CHUNK, exception_on_overflow=False)
frames.append(data)
# 获取当前chunk的音量分贝值
temp_volume = np.max(np.frombuffer(data, dtype=np.short))
if temp_volume > QUIET_DB and flag==False:
print("音量高于阈值,开始录音")
flag =True
START_TIME = temp_time
last_ok_time = temp_time
if flag: # 录音中的各种情况
if(temp_volume < QUIET_DB and quiet_flag==False):
print("录音中,当前音量低于阈值")
quiet_flag = True
last_ok_time = temp_time
if(temp_volume > QUIET_DB):
# print('录音中,当前音量高于阈值,正常录音')
quiet_flag = False
last_ok_time = temp_time
if(temp_time > last_ok_time + delay_time*15 and quiet_flag==True):
print("音量低于阈值{:.2f}秒后,检测当前音量".format(delay_time))
if(quiet_flag and temp_volume < QUIET_DB):
print("当前音量仍然小于阈值,录音结束")
END_TIME = temp_time
break
else:
print("当前音量重新高于阈值,继续录音中")
quiet_flag = False
last_ok_time = temp_time
# print('当前帧 {} 音量 {}'.format(temp_time+1, temp_volume))
temp_time += 1
if temp_time > 150: # 超时直接退出
END_TIME = temp_time
print('超时,录音结束')
break
# 停止录音
stream.stop_stream()
stream.close()
p.terminate()
# 导出wav音频文件
output_path = 'temp/speech_record.wav'
wf = wave.open(output_path, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames[START_TIME-2:END_TIME]))
wf.close()
print('保存录音文件', output_path)
import appbuilder
# 配置密钥
os.environ["APPBUILDER_TOKEN"] = APPBUILDER_TOKEN
asr = appbuilder.ASR() # 语音识别组件
def speech_recognition(audio_path='temp/speech_record.wav'):
'''
AppBuilder-SDK语音识别组件
'''
print('开始语音识别')
# 载入wav音频文件
with wave.open(audio_path, 'rb') as wav_file:
# 获取音频文件的基本信息
num_channels = wav_file.getnchannels()
sample_width = wav_file.getsampwidth()
framerate = wav_file.getframerate()
num_frames = wav_file.getnframes()
# 获取音频数据
frames = wav_file.readframes(num_frames)
# 向API发起请求
content_data = {"audio_format": "wav", "raw_audio": frames, "rate": 16000}
message = appbuilder.Message(content_data)
speech_result = asr.run(message).content['result'][0]
print('语音识别结果:', speech_result)
return speech_result