Wav2Lipのオープンソース版を改造して外部から呼べるAPI化する
POSTED BY
2025-08-27
2025-08-27



Wav2Lipのオープンソース版で静止画の口元のみを動かして喋らせる
前回Wav2Lipのオープンソース版をコンソール利用までは成功したが、実用するにはこれをAPI化して、アプリなど外部から画像とWAVをバイナリで受け取り動画を返す必要がある。
これには本体のCUIソースのinference.pyを改造するのが手っ取り速そうである。
https://github.com/zabique/Wav2Lip/blob/master/inference.py
改造したソースはこちら。
Python | wav2lip_service.py | GitHub Source |
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os, tempfile, shutil, subprocess, gc, atexit, asyncio, copy from typing import List import numpy as np import cv2 import torch from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import FileResponse from tqdm import tqdm # ---- Wav2Lip リポのモジュール(PYTHONPATH を Wav2Lip ルートに通しておく)---- import face_detection from models import Wav2Lip import audio as audio_lib # 同梱の audio.py # ========================================================= # 設定 # ========================================================= DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # Wav2Lip本体 FACE_DEVICE = os.environ.get("W2L_FACE_DEVICE", "cpu") # 顔検出(既定: CPU) CKPT_PATH = os.environ.get("W2L_CKPT", "checkpoints/wav2lip_gan.pth") IMG_SIZE = 96 MEL_STEP_SIZE = 16 DEFAULTS = dict( pads=[0,10,0,0], face_det_batch_size=16, wav2lip_batch_size=64, # メモリ抑制のため既定値を少し小さめに resize_factor=1, crop=[0,-1,0,-1], box=[-1,-1,-1,-1], rotate=False, nosmooth=False, fps=25.0, static=None, # Noneなら自動判定 ) # 作業用ベースディレクトリ(カレント/working_dir に固定) BASE_WORK_DIR = os.path.join(os.getcwd(), "working_dir") os.makedirs(BASE_WORK_DIR, exist_ok=True) # ========================================================= # アプリ & グローバル # ========================================================= app = FastAPI(title="Wav2Lip Service", root_path="/wav2lip_api") MODEL = None _infer_lock = asyncio.Semaphore(1) # 同時推論を直列化 # ========================================================= # 初期化と後始末 # ========================================================= def _load_checkpoint(checkpoint_path: str): if DEVICE == 'cuda': return torch.load(checkpoint_path) else: return torch.load(checkpoint_path, map_location=lambda storage, loc: storage) def load_model_once(path: str): model = Wav2Lip() ckpt = _load_checkpoint(path) s = ckpt["state_dict"] new_s = {k.replace('module.', ''): v for k, v in s.items()} model.load_state_dict(new_s) return model.to(DEVICE).eval() @app.on_event("startup") def _startup(): global MODEL MODEL = load_model_once(CKPT_PATH) print(f"[startup] Wav2Lip on {DEVICE}, FaceDetector on {FACE_DEVICE}", flush=True) print(f"[startup] Loaded checkpoint: {CKPT_PATH}", flush=True) @atexit.register def _cleanup(): try: if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() except Exception: pass # ========================================================= # ユーティリティ # ========================================================= def get_smoothened_boxes(boxes, T): for i in range(len(boxes)): window = boxes[i: i+T] if i+T <= len(boxes) else boxes[len(boxes)-T:] boxes[i] = np.mean(window, axis=0) return boxes def face_detect(images, pads: List[int], batch_size: int, nosmooth: bool): """毎回ローカルで FaceAlignment を生成して破棄 → 状態・履歴汚染を防ぐ""" detector = face_detection.FaceAlignment( face_detection.LandmarksType._2D, flip_input=False, device=FACE_DEVICE ) local_bs = batch_size preds = [] try: while True: try: preds = [] for i in range(0, len(images), local_bs): preds.extend(detector.get_detections_for_batch(np.array(images[i:i+local_bs]))) except RuntimeError: if local_bs == 1: raise RuntimeError('Image too big for face-detector. Try resize_factor.') local_bs //= 2 print(f"[warn] face-det OOM -> retry batch={local_bs}", flush=True) continue break results = [] pady1, pady2, padx1, padx2 = pads for rect, image in zip(preds, images): if rect is None: raise ValueError('Face not detected in some frames.') y1 = max(0, rect[1] - pady1); y2 = min(image.shape[0], rect[3] + pady2) x1 = max(0, rect[0] - padx1); x2 = min(image.shape[1], rect[2] + padx2) results.append([x1, y1, x2, y2]) boxes = np.array(results) if not nosmooth: boxes = get_smoothened_boxes(boxes, T=5) out = [[im[y1:y2, x1:x2], (y1, y2, x1, x2)] for im, (x1, y1, x2, y2) in zip(images, boxes)] return out finally: del detector gc.collect() def datagen(frames, mels, p): img_batch, mel_batch, frame_batch, coords_batch = [], [], [], [] static = p['static']; box = p['box']; img_size = p['img_size'] pads = p['pads']; face_det_bs = p['face_det_batch_size'] nosmooth = p['nosmooth']; batch_size = p['wav2lip_batch_size'] if box[0] == -1: face_det_results = face_detect(frames if not static else [frames[0]], pads, face_det_bs, nosmooth) else: y1, y2, x1, x2 = box face_det_results = [[f[y1:y2, x1:x2], (y1, y2, x1, x2)] for f in frames] for i, m in enumerate(mels): idx = 0 if static else i % len(frames) fsave = frames[idx].copy() face, coords = face_det_results[idx].copy() face = cv2.resize(face, (img_size, img_size)) img_batch.append(face); mel_batch.append(m); frame_batch.append(fsave); coords_batch.append(coords) if len(img_batch) >= batch_size: img_batch_np, mel_batch_np = np.asarray(img_batch), np.asarray(mel_batch) img_masked = img_batch_np.copy(); img_masked[:, img_size//2:] = 0 img_batch_np = np.concatenate((img_masked, img_batch_np), axis=3) / 255. mel_batch_np = np.reshape(mel_batch_np, [len(mel_batch_np), mel_batch_np.shape[1], mel_batch_np.shape[2], 1]) yield img_batch_np, mel_batch_np, frame_batch, coords_batch img_batch, mel_batch, frame_batch, coords_batch = [], [], [], [] if len(img_batch) > 0: img_batch_np, mel_batch_np = np.asarray(img_batch), np.asarray(mel_batch) img_masked = img_batch_np.copy(); img_masked[:, img_size//2:] = 0 img_batch_np = np.concatenate((img_masked, img_batch_np), axis=3) / 255. mel_batch_np = np.reshape(mel_batch_np, [len(mel_batch_np), mel_batch_np.shape[1], mel_batch_np.shape[2], 1]) yield img_batch_np, mel_batch_np, frame_batch, coords_batch def _detect_static(face_path: str) -> bool: ext = os.path.splitext(face_path.lower())[1].lstrip('.') if ext in ('jpg','jpeg','png'): return True img_try = cv2.imread(face_path) return (img_try is not None) def run_wav2lip(face_path: str, audio_path: str, out_mp4: str, user_params: dict) -> str: # ---- dictは deep copy してから上書き(リスト汚染防止)---- p = copy.deepcopy(DEFAULTS) p.update(user_params or {}) p['img_size'] = IMG_SIZE if p['static'] is None: p['static'] = _detect_static(face_path) if not os.path.isfile(face_path): raise ValueError('face path invalid') # ---- 入力フレーム取得 ---- if p['static']: full_frames = [cv2.imread(face_path)] fps = float(p['fps']) else: vs = cv2.VideoCapture(face_path); fps = vs.get(cv2.CAP_PROP_FPS); full_frames=[] try: while True: ok, frame = vs.read() if not ok: break if p['resize_factor'] > 1: frame = cv2.resize(frame, (frame.shape[1]//p['resize_factor'], frame.shape[0]//p['resize_factor'])) if p['rotate']: frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) y1, y2, x1, x2 = p['crop'] if x2 == -1: x2 = frame.shape[1] if y2 == -1: y2 = frame.shape[0] frame = frame[y1:y2, x1:x2] full_frames.append(frame) finally: vs.release() # ---- 音声: 16k/mono に整形 ---- workdir = tempfile.mkdtemp(prefix='w2l_', dir=BASE_WORK_DIR) tmp_wav = os.path.join(workdir, 'audio.wav') subprocess.check_call(f'ffmpeg -y -i "{audio_path}" -ar 16000 -ac 1 -f wav "{tmp_wav}"', shell=True) wav = audio_lib.load_wav(tmp_wav, 16000) mel = audio_lib.melspectrogram(wav) if np.isnan(mel.reshape(-1)).sum() > 0: raise ValueError('Mel contains NaN. Try adding small noise to wav.') # ---- mel をフレーム数に合わせて分割 ---- mel_chunks = [] mel_idx_multiplier = 80. / (p['fps'] if p['static'] else (fps or p['fps'])) i = 0 while True: start_idx = int(i * mel_idx_multiplier) if start_idx + MEL_STEP_SIZE > len(mel[0]): mel_chunks.append(mel[:, len(mel[0]) - MEL_STEP_SIZE:]) break mel_chunks.append(mel[:, start_idx: start_idx + MEL_STEP_SIZE]) i += 1 full_frames = full_frames[:len(mel_chunks)] # ---- 合成 ---- frame_h, frame_w = full_frames[0].shape[:-1] temp_avi = os.path.join(workdir, 'result.avi') outv = cv2.VideoWriter( temp_avi, cv2.VideoWriter_fourcc(*'DIVX'), float(p['fps'] if p['static'] else (fps or p['fps'])), (frame_w, frame_h) ) gen = datagen(full_frames.copy(), mel_chunks, p) # --- 同期的・安全版 推論ループ(非同期コピー禁止) --- total_steps = int(np.ceil(len(mel_chunks)/p['wav2lip_batch_size'])) for img_batch, mel_batch, frames, coords in tqdm(gen, total=total_steps, desc='infer', leave=False): # CPU -> GPU は同期(non_blocking=False) img_np = np.transpose(img_batch, (0,3,1,2)).astype(np.float32) mel_np = np.transpose(mel_batch, (0,3,1,2)).astype(np.float32) img_t = torch.from_numpy(img_np).to(DEVICE) mel_t = torch.from_numpy(mel_np).to(DEVICE) with torch.inference_mode(): pred = MODEL(mel_t, img_t) # ここでGPU計算を完了させる if torch.cuda.is_available(): torch.cuda.synchronize() # CPU に同期コピー(non_blocking=False) pred_cpu = pred.detach().to('cpu') pred_np = pred_cpu.numpy().transpose(0,2,3,1) * 255.0 # 直ちにGPUテンソルを解放 del pred, img_t, mel_t, pred_cpu # 合成 for pimg, f, c in zip(pred_np, frames, coords): y1, y2, x1, x2 = c pimg = cv2.resize(pimg.astype(np.uint8), (x2 - x1, y2 - y1)) f[y1:y2, x1:x2] = pimg outv.write(f) # 大きい配列は都度破棄 del pred_np, img_batch, mel_batch, frames, coords, img_np, mel_np gc.collect() outv.release() if torch.cuda.is_available(): torch.cuda.synchronize() # ---- 音声とmuxしてmp4 ---- os.makedirs(os.path.dirname(out_mp4), exist_ok=True) subprocess.check_call( f'ffmpeg -y -i "{tmp_wav}" -i "{temp_avi}" -shortest -c:v libx264 -pix_fmt yuv420p ' f'-movflags +faststart -c:a aac "{out_mp4}"', shell=True ) # 作業ディレクトリを掃除(出力は out_mp4 側にある) try: shutil.rmtree(workdir, ignore_errors=True) except Exception: pass # さらに状態を残さない del gen, full_frames, mel_chunks gc.collect() return out_mp4 def resize_with_padding(image_path, output_path, target_size=(1280, 720)): # 画像読み込み img = cv2.imread(image_path) if img is None: raise ValueError("画像が読み込めませんでした") h, w = img.shape[:2] target_w, target_h = target_size # 縦横比を維持しながら縮小 scale = min(target_w / w, target_h / h) new_w = int(w * scale) new_h = int(h * scale) resized = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA) # 黒背景キャンバス作成 result = np.zeros((target_h, target_w, 3), dtype=np.uint8) # 中央に配置するためのオフセット計算 x_offset = (target_w - new_w) // 2 y_offset = (target_h - new_h) // 2 result[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized # 保存 cv2.imwrite(output_path, result) # ========================================================= # エンドポイント # ========================================================= @app.post("/lip-sync") async def lip_sync( face: UploadFile = File(...), audio: UploadFile = File(...), fps: float = Form(25.0), pads: str = Form("0,10,0,0"), resize_factor: int = Form(1), rotate: int = Form(0), nosmooth: int = Form(0), fbs: int = Form(16), wbs: int = Form(64), crop: str = Form("0,-1,0,-1"), box: str = Form("-1,-1,-1,-1"), ): # 同時実行制御 async with _infer_lock: work_in = tempfile.mkdtemp(prefix="w2l_in_", dir=BASE_WORK_DIR) outdir = tempfile.mkdtemp(prefix="w2l_out_", dir=BASE_WORK_DIR) face_path = os.path.join(work_in, face.filename or "face_input") face_trans = os.path.join(work_in, "face_trans.jpg") audio_path = os.path.join(work_in, audio.filename or "audio_input") out_mp4 = os.path.join(outdir, "result.mp4") # 一時保存(大きなファイルでも扱えるように) with open(face_path, "wb") as f: shutil.copyfileobj(face.file, f) with open(audio_path, "wb") as f: shutil.copyfileobj(audio.file, f) resize_with_padding(face_path, face_trans); params = { "fps": float(fps), "pads": [int(x) for x in pads.split(",")], "resize_factor": int(resize_factor), "rotate": (rotate == 1), "nosmooth": (nosmooth == 1), "face_det_batch_size": int(fbs), "wav2lip_batch_size": int(wbs), "crop": [int(x) for x in crop.split(",")], "box": [int(x) for x in box.split(",")], "static": None, } try: out_path = run_wav2lip(face_trans, audio_path, out_mp4, params) return FileResponse(out_path, media_type="video/mp4", filename="result.mp4") except Exception as e: raise HTTPException(status_code=400, detail=str(e)) finally: try: await face.close() await audio.close() except Exception: pass gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() # 入力側の一時ディレクトリは削除(出力側は FileResponse が使うので保持) # try: # shutil.rmtree(work_in, ignore_errors=True) # except Exception: # pass
・FastCGIによりaudioとface画像を受け取り顔判定して口パク動画を返却
・入力画像は、動作安定のためすべて1280×720に変換している
・外部から/wav2lip_api/lip-syncと呼べるようroot_pathを設定
・モデルのロードを毎回行わず、起動時に1回のみ読み込み常駐させる
・前回の顔判定データが残らないよう、加工データは毎回すべてクリア
・GPUメモリにキャッシュが積み上がらないよう毎回クリア
作業ディレクトリはWav2Lip/working_dirとしているので作成しておく。
モデルファイルはWav2Lip/checkpoints/wav2lip_gan.pthを使用。
必要モジュール
pip install fastapi uvicorn python-multipart opencv-python-headless tqdm
起動
uvicorn wav2lip_service:app --host 0.0.0.0 --port 8080 --workers 1
バックグラウンド常時起動(wav2lip_service.logにログ出力)
nohup uvicorn wav2lip_service:app --host 127.0.0.1 --port 8080 > wav2lip_service.log 2>&1 &
このサーバーがtest.hogeserver.jpであってApache+SSL稼働中であるなら、リバースプロキシの設定をして外部との橋渡しを行う。
<location> Require all granted ProxyPass http://127.0.0.1:8080 ProxyPassReverse http://127.0.0.1:8080 RequestHeader set X_FORWARDED_PROTO 'https' </location>
クライアント実行テスト
curl -X POST "https://test.hogeserver.jp/wav2lip_api/lip-sync" -F "face=@face.jpg" -F "audio=@voice.wav" --output result.mp4 -v
Android
iPhone/iPad
Flutter
MacOS
Windows
Debian
Ubuntu
CentOS
FreeBSD
RaspberryPI
HTML/CSS
C/C++
PHP
Java
JavaScript
Node.js
Swift
Python
MatLab
Amazon/AWS
CORESERVER
Google
仮想通貨
LINE
OpenAI/ChatGPT
IBM Watson
Microsoft Azure
Xcode
VMware
MySQL
PostgreSQL
Redis
Groonga
Git/GitHub
Apache
nginx
Postfix
SendGrid
Hackintosh
Hardware
Fate/Grand Order
ウマ娘
将棋
ドラレコ
※本記事は当サイト管理人の個人的な備忘録です。本記事の参照又は付随ソースコード利用後にいかなる損害が発生しても当サイト及び管理人は一切責任を負いません。
※本記事内容の無断転載を禁じます。
※本記事内容の無断転載を禁じます。
【WEBMASTER/管理人】
自営業プログラマーです。お仕事ください!ご連絡は以下アドレスまでお願いします★
【キーワード検索】
【最近の記事】【全部の記事】
Wav2Lipのオープンソース版を改造して外部から呼べるAPI化するWav2Lipのオープンソース版で静止画の口元のみを動かして喋らせる
【iOS】アプリアイコン・ロゴ画像の作成・設定方法
オープンソースリップシンクエンジンSadTalkerをAPI化してアプリから呼ぶ【2】
オープンソースリップシンクエンジンSadTalkerをAPI化してアプリから呼ぶ【1】
【Xcode】iPhone is not available because it is unpairedの対処法
【Let's Encrypt】Failed authorization procedure 503の対処法
【Debian】古いバージョンでapt updateしたら404 not foundでエラーになる場合
ファイアウォール内部のWindows11 PCにmacOS Sequoiaからリモートデスクトップする
ファイアウォール内部のNode.js+Socket.ioを外部からProxyPassを通して使う
【人気の記事】【全部の記事】
進研ゼミチャレンジタッチをAndroid端末化するWindows11+WSL2でUbuntuを使う【2】ブリッジ接続+固定IPの設定
【Windows10】リモートデスクトップ間のコピー&ペーストができなくなった場合の対処法
【Apache】サーバーに同時接続可能なクライアント数を調整する
GitLabにHTTPS経由でリポジトリをクローン&読み書きを行う
size_tとssize_tを使い分けてSegmentation Faultを予防する
【Debian】古いバージョンでapt updateしたら404 not foundでエラーになる場合
Windows11のコマンドプロンプトでテキストをコピーする
【C/C++】小数点以下の切り捨て・切り上げ・四捨五入
システムで予約済みのパーティションを更新できませんでした このPCは現在Windows11のシステム要件を満たしていません
【カテゴリーリンク】
Android
iPhone/iPad
Flutter
MacOS
Windows
Debian
Ubuntu
CentOS
FreeBSD
RaspberryPI
HTML/CSS
C/C++
PHP
Java
JavaScript
Node.js
Swift
Python
MatLab
Amazon/AWS
CORESERVER
Google
仮想通貨
LINE
OpenAI/ChatGPT
IBM Watson
Microsoft Azure
Xcode
VMware
MySQL
PostgreSQL
Redis
Groonga
Git/GitHub
Apache
nginx
Postfix
SendGrid
Hackintosh
Hardware
Fate/Grand Order
ウマ娘
将棋
ドラレコ