用百度智能云批量识别微信语音

目录
如果你没有在 cnblog 作者为 carr0t2 中看到此文章,建议访问 原网页以获取更好的排版,图片体验

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

准备工具及环境

  1. Python
  2. silk-v3-decoder https://github.com/kn007/silk-v3-decoder
  3. 百度智能云账号(用百度账号就行),申请 API Key 和 Secret Key
  4. 百度短语音识别API Demo 基于官方Demo代码修改 https://github.com/Baidu-AIP/speech-demo/tree/master/rest-api-asr/python
  5. 本文环境为Windows python3.7

大致思路

  1. 手机微信找到语音文件保存的位置,导出

  2. 用silk-v3-decoder将录音转换为wav格式

  3. ffmpeg将wav转成pcm,采样频率16000

  4. 用python识别

  5. 仅个人处理方法,有问题欢迎指出

具体操作

导出微信语音文件

  • 手机微信语音文件一般保存在内部存储\tencent\MicroMsg\****************************\voice2

    星号里是一个很长的包含数字字母的字符串

    里面包括这许多这样的文件夹
    用百度智能云批量识别微信语音 Python 第1张

  • 全部复制粘贴到并提取出音频文件

    Windows下搜索.amr

    用百度智能云批量识别微信语音 Python 第2张

    全选复制粘贴到一个新文件夹

  • 这些就是录音文件,但是格式比较奇怪,需要处理成常规格式

处理导出语音文件

重命名文件

  • 因为要保持相对顺序,而直接进行转换会导致文件修改时间变化,于是无法恢复正常语音顺序
  • 用python,提取文件修改时间并重命名
import os
import time

path='.\\lecture'
dirs = os.listdir(path)
for file in dirs:
    finfo = os.stat(path+'\\'+file)
    timeArray = time.localtime(finfo.st_mtime)
    nametime = time.strftime("%Y_%m_%d_%H_%M_%S", timeArray)
    os.rename(path+'\\'+file,path+'\\'+nametime+'.amr')
    print(nametime)

转换为pcm格式

  • python通过命令行调用silk_v3_decoder.exe 解码,具体命令写在下面
  • pcm文件好像无法直接播放,Audacity是可以的

修改Demo代码

前面的修改

  • silk_v3_decoder.exe转格式为16k pcm
    FORMAT = 'pcm'
    pathamr=r'.\amr'
    pathpcm=r'.\pcm'
    dirs = os.listdir(pathamr)
    #dirs.remove('desktop.ini')### Windows可能会有这个文件
    for file in dirs:
        time.sleep(0.3)
        name=file[:-3]
        commandstring= ' silk_v3_decoder.exe ' + str(pathamr) + '\\' + name + 'amr ' + str(pathpcm) +'\\'+ str(name) + 'pcm' +' -Fs_API 16000 '
        os.system(commandstring)
        AUDIO_FILE =str(pathpcm)+'\\'+ str(name) + 'pcm'

后面的修改

  • 使输出为追加,并且增加时间字段,后续处理还没有做,所以导出的文件还是json
	    with open("result.txt","a") as of:
            result_dict=eval(result_str)
            result_dict["time"]=name
            of.write(str(result_dict)+'\n')

用百度智能云批量识别微信语音 Python 第3张

后记

  • 刚学python,随便写写,欢迎指出错误
  • 文件后续处理还没有做好,想做成输出是前面一行时间,后面一行识别内容,如果有识别偏差较大,方便找到位置重新听
  • 没有做到全程自动化,还是要手动处理内容的。
  • 没用到百度的语音自训练平台

代码(仅供参考)

import sys
import json
import base64
import time
import os
import subprocess

IS_PY3 = sys.version_info.major == 3

if IS_PY3:
    from urllib.request import urlopen
    from urllib.request import Request
    from urllib.error import URLError
    from urllib.parse import urlencode
    timer = time.perf_counter
else:
    from urllib2 import urlopen
    from urllib2 import Request
    from urllib2 import URLError
    from urllib import urlencode
    if sys.platform == "win32":
        timer = time.clock
    else:
        # On most other platforms the best timer is time.time()
        timer = time.time

API_KEY = '****************'### 填入自己的
SECRET_KEY = '*****************'

# 需要识别的文件
# 文件格式
FORMAT = 'pcm'  # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
###这里为了方便直接限制死
CUID = '****************'
# 采样率
RATE = 16000  # 固定值

DEV_PID = 1537  # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get'  # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有


class DemoError(Exception):
    pass


"""  TOKEN start """

TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'

def fetch_token():
    params = {'grant_type': 'client_credentials',
              'client_id': API_KEY,
              'client_secret': SECRET_KEY}
    post_data = urlencode(params)
    if (IS_PY3):
        post_data = post_data.encode( 'utf-8')
    req = Request(TOKEN_URL, post_data)
    try:
        f = urlopen(req)
        result_str = f.read()
    except URLError as err:
        print('token http response http code : ' + str(err.code))
        result_str = err.read()
    if (IS_PY3):
        result_str =  result_str.decode()

    print(result_str)
    result = json.loads(result_str)
    print(result)
    if ('access_token' in result.keys() and 'scope' in result.keys()):
        print(SCOPE)
        if SCOPE and (not SCOPE in result['scope'].split(' ')):  # SCOPE = False 忽略检查
            raise DemoError('scope is not correct')
        print('SUCCESS WITH TOKEN: %s  EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
        return result['access_token']
    else:
        raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')

"""  TOKEN end """

if __name__ == '__main__':
    token = fetch_token()

    pathamr=r'.\amr'
    pathpcm=r'.\pcm'
    dirs = os.listdir(pathamr)
    #dirs.remove('desktop.ini')### Windows可能会有这个文件
    for file in dirs:
        time.sleep(0.2)
        name=file[:-3]
        commandstring= ' silk_v3_decoder.exe ' + str(pathamr) + '\\' + name + 'amr ' + str(pathpcm) +'\\'+ str(name) + 'pcm' +' -Fs_API 16000 '
        os.system(commandstring)
        ######下面没怎么动过了
        AUDIO_FILE =str(pathpcm)+'\\'+ str(name) + 'pcm'
        speech_data = []
        with open(AUDIO_FILE, 'rb') as speech_file:
            speech_data = speech_file.read()

        length = len(speech_data)
        if length == 0:
            raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)
        speech = base64.b64encode(speech_data)
        if (IS_PY3):
            speech = str(speech, 'utf-8')
        params = {'dev_pid': DEV_PID,
                 #"lm_id" : LM_ID,    #测试自训练平台开启此项
                  'format': FORMAT,
                  'rate': RATE,
                  'token': token,
                  'cuid': CUID,
                  'channel': 1,
                  'speech': speech,
                  'len': length
                  }
        post_data = json.dumps(params, sort_keys=False)
        # print post_data
        req = Request(ASR_URL, post_data.encode('utf-8'))
        req.add_header('Content-Type', 'application/json')
        try:
            begin = timer()
            f = urlopen(req)
            result_str = f.read()
            print ("Request time cost %f" % (timer() - begin))
        except URLError as err:
            print('asr http response http code : ' + str(err.code))
            result_str = err.read()

        if (IS_PY3):
            result_str = str(result_str, 'utf-8')
        print(result_str)
        with open("result.txt","a") as of:
            result_dict=eval(result_str)
            #result_dict["time"]=name
            #of.write(str(result_dict)+'\n')
            of.write('{'+name+'}'+'\n')
            try:
                of.write(str(result_dict["result"])[2:-2]+'\n\n')
            except:
                of.write('Error'+'\n')
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄