以下の記事では,Pythonで音の再生・録音を行うためのライブラリ sounddevice
を紹介しました。
Python-sounddevice で音声や歌声をリアルタイム収音・再生・録音 - Wizard Notes
この記事では、音の信号処理/アプリ開発でよく使う、音声ファイルの再生をsounddevice
を使ってどのように実装するかを紹介します。
方法1: オーディオをそのまま再生する
sounddevice
を使った音声ファイルの再生で最も簡単なのが sd.play()
を使う方法です。
sd.play()
を使えば下記の例のように数行で音を再生できます。
ただし、再生する音信号データを丸ごとsd.play()
に渡すため,マイク入力等に応じてリアルタイムで信号を加工し再生する用途には不向きです
import sounddevice as sd import soundfile as sf filepath = "./audio.wav" # sig: 信号, sr: サンプリング周波数 sig, sr = sf.read(filepath, always_2d=True) sd.play(sig, sr) sd.wait() # sd.playが完了するのを待つ print("End")
なお、再生途中でsd.stop()
を実行すると再生を停止することができます。
また、オーディオファイルの再生にはsoundfile
を使っています。詳しくは以下の記事をご覧ください。
方法2: オーディオをリアルタイム処理して再生
こちらはblocksize
の短いサンプル数ごとに信号を処理するため,マイク入力やその他の信号に応じてリアルタイムで音信号を加工するようなエフェクタを作るのに向いています。
import sounddevice as sd import soundfile as sf import threading """ filepath = "./audio.wav" # sig: 信号, sr: サンプリング周波数 sig, sr = sf.read(filepath, always_2d=True) sd.play(sig, sr) sd.wait() # sd.playが完了するのを待つ print("End") """ filepath = "./elepi.wav" sig, sr = sf.read(filepath, always_2d=True) # sig: 信号, sr: サンプリング周波数 n_samples, n_channels = sig.shape blocksize = 1024 current_frame = 0 # コールバック関数 def callback(indata, outdata, frames, time, status): global current_frame, n_samples, n_channelss chunksize = min(n_samples - current_frame, frames) outdata[:] *= 0.0 # チャンネルごとの信号処理 for k in range(n_channels): outdata[0:chunksize, k] = self.sig[current_frame:current_frame + chunksize, k] if chunksize < frames: raise sd.CallbackStop() current_frame += chunksize event = threading.Event() with sd.Stream( samplerate=sr, blocksize=blocksize, channels=sig.shape[1], callback=callback, finished_callback=event.set ): print("start") event.wait() # 再生終了まで待機 print("End")
方法2.1 マルチスレッド処理
方法2でマルチスレッド処理を利用することで、バックグラウンドでのリアルタイム再生・信号処理ができます。
import sounddevice as sd import soundfile as sf import threading class Player(threading.Thread): def __init__(self, sig, sr, blocksize = 1024): super(Player, self).__init__() self.sig = sig self.sr = sr self.n_samples, self.n_channels = sig.shape self.blocksize = blocksize def callback(self, indata, outdata, frames, time, status): chunksize = min(self.n_samples - self.current_frame, frames) outdata[:] *= 0.0 # チャンネルごとの信号処理 for k in range(self.n_channels): outdata[0:chunksize, k] = self.sig[self.current_frame:self.current_frame + chunksize, k] if chunksize < frames: raise sd.CallbackStop() self.current_frame += chunksize def run(self): self.current_frame = 0 self.event = threading.Event() with sd.Stream( samplerate=self.sr, blocksize=self.blocksize, channels=self.n_channels, callback=self.callback, finished_callback=self.event.set ): self.event.wait() filepath = "./audio.wav" sig, sr = sf.read(filepath, always_2d=True) player = Player(sig, sr) player.start() print("Start") import code console = code.InteractiveConsole(locals=locals()) console.interact()
方法3: オーディオをリアルタイム処理して再生(同期処理)
方法2はcallback関数を渡すことで非同期(ノンブロッキング)処理となっていました。
もう一つの再生方法として、stream.write()
によって信号データを書き込むことで
同期(ノンブロッキング)処理として音声ファイルの再生ができます。
import numpy as np import sounddevice as sd import soundfile as sf filepath = "./audio.wav" sig, sr = sf.read(filepath, always_2d=True) # sig: 信号, sr: サンプリング周波数 n_samples, n_channels = sig.shape blocksize = 1024 current_frame = 0 chunk = np.zeros((blocksize, n_channels)) dtype = np.float32 with sd.OutputStream( samplerate=sr, blocksize=blocksize, channels=n_channels, dtype=dtype ) as stream: while True: chunksize = min(n_samples - current_frame, blocksize) chunk[0:chunksize] = sig[current_frame:current_frame + chunksize, :] stream.write( chunk.astype(dtype) ) # chunkの信号を書き込み if chunksize < blocksize: break current_frame += chunksize
まとめ
sounddevice
を使った音声ファイルの再生・リアルタイム処理の実装方法を紹介しました。
ご自身の作成している信号処理/アプリ開発に合った実装方法を是非利用してみてください。