第一步:导入工作流模板
访问n8n管理界面,导入预制的工作流模板:
1
访问n8n界面
在浏览器中打开 http://localhost:5678
2
创建新工作流
点击"新建工作流",选择"从JSON导入"
3
粘贴工作流JSON
将下载的工作流JSON内容粘贴到导入框中
第二步:配置节点参数
视频输入节点配置
{
"name": "视频输入",
"type": "n8n-nodes-base.webhook",
"parameters": {
"path": "video-input",
"httpMethod": "POST",
"responseMode": "onReceived",
"options": {
"rawBody": true
}
}
}
Gemini视频分析节点
{
"name": "Gemini视频分析",
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"url": "https://generativelanguage.googleapis.com/v1/models/gemini-pro-vision:generateContent",
"authentication": "genericCredentialType",
"genericAuthType": "httpHeaderAuth",
"httpHeaderAuth": {
"name": "x-goog-api-key",
"value": "={{$env.GEMINI_API_KEY}}"
},
"method": "POST",
"body": {
"contents": [{
"parts": [{
"text": "请分析这个视频的主要内容,包括:1.核心主题 2.关键场景描述 3.适合的解说角度 4.建议的分镜时间点。请以JSON格式返回结果。"
}, {
"inline_data": {
"mime_type": "video/mp4",
"data": "={{$json.videoBase64}}"
}
}]
}]
}
}
}
FFmpeg视频处理节点
import subprocess
import json
from moviepy.editor import VideoFileClip
import os
def process_video_segments(video_path, segments_data):
"""根据AI分析结果切割视频片段"""
segments = json.loads(segments_data)
output_files = []
for i, segment in enumerate(segments['scenes']):
start_time = segment['start_time']
end_time = segment['end_time']
output_path = f"segment_{i}_{start_time}_{end_time}.mp4"
# 使用FFmpeg进行精确切割
cmd = [
'ffmpeg',
'-i', video_path,
'-ss', str(start_time),
'-to', str(end_time),
'-c', 'copy', # 使用流拷贝以提高速度
'-avoid_negative_ts', 'make_zero',
output_path
]
subprocess.run(cmd, check=True)
output_files.append(output_path)
return output_files
第三步:配置AI提示词
关键的AI提示词配置将直接影响生成视频的质量和风格:
# 视频分析提示词模板
你是一位专业的视频导演和编剧。请仔细分析这个视频,并按照以下要求返回JSON格式的分析结果:
## 分析要求:
1. **内容主题**:概括视频的核心主题和亮点
2. **情感基调**:识别视频的整体情感色彩(激励、轻松、严肃等)
3. **关键场景**:识别3-5个最具吸引力的场景片段
4. **解说角度**:建议最适合的解说风格和切入点
5. **目标受众**:分析适合的观众群体
## 输出格式:
{
"theme": "视频核心主题",
"emotion": "情感基调",
"key_scenes": [
{
"start_time": 开始时间(秒),
"end_time": 结束时间(秒),
"description": "场景描述",
"importance_score": 重要性评分(1-10)
}
],
"narration_style": "建议的解说风格",
"target_audience": "目标受众描述",
"hook_points": ["吸引点1", "吸引点2", "吸引点3"]
}
请确保分析准确、具体,为后续的自动化处理提供高质量的数据支撑。
第四步:TTS语音合成配置
配置高质量的文字转语音服务,支持多种声音风格:
import requests
import base64
from typing import Dict, Any
class TTSProcessor:
def __init__(self, service_type="google"):
self.service_type = service_type
def generate_audio(self, text: str, voice_config: Dict[str, Any]) -> str:
"""生成TTS音频并返回base64编码"""
if self.service_type == "google":
return self._google_tts(text, voice_config)
elif self.service_type == "azure":
return self._azure_tts(text, voice_config)
else:
raise ValueError(f"不支持的TTS服务: {self.service_type}")
def _google_tts(self, text: str, config: Dict[str, Any]) -> str:
"""Google Cloud TTS实现"""
url = "https://texttospeech.googleapis.com/v1/text:synthesize"
payload = {
"input": {"text": text},
"voice": {
"languageCode": config.get("language", "zh-CN"),
"name": config.get("voice_name", "zh-CN-Standard-A"),
"ssmlGender": config.get("gender", "NEUTRAL")
},
"audioConfig": {
"audioEncoding": "MP3",
"speakingRate": config.get("speed", 1.0),
"pitch": config.get("pitch", 0.0)
}
}
headers = {
"Authorization": f"Bearer {os.getenv('GOOGLE_TTS_KEY')}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()["audioContent"]
# 使用示例
tts = TTSProcessor("google")
audio_base64 = tts.generate_audio(
"这是一个精彩的视频片段,展现了令人震撼的画面。",
{
"language": "zh-CN",
"voice_name": "zh-CN-Neural2-A",
"speed": 1.1,
"pitch": 2.0
}
)
第五步:音画同步处理
实现精准的音画同步,确保解说与视频内容完美匹配:
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
import librosa
def sync_audio_video(video_path: str, audio_path: str, target_duration: float) -> str:
"""音画同步处理"""
# 加载视频和音频
video = VideoFileClip(video_path)
audio = AudioFileClip(audio_path)
video_duration = video.duration
audio_duration = audio.duration
# 计算速度调整比例
if audio_duration > target_duration:
# 音频太长,需要加速
speed_factor = audio_duration / target_duration
adjusted_audio = audio.fx(lambda clip: clip.speedx(speed_factor))
else:
# 音频太短,需要减速或添加停顿
speed_factor = audio_duration / target_duration
if speed_factor > 0.7: # 减速不要过度
adjusted_audio = audio.fx(lambda clip: clip.speedx(speed_factor))
else:
# 添加静音间隔
silence_duration = target_duration - audio_duration
adjusted_audio = CompositeAudioClip([
audio,
AudioFileClip("silence.mp3", duration=silence_duration)
])
# 如果需要,同时调整视频速度
if abs(speed_factor - 1.0) > 0.3: # 速度差异较大时调整视频
video_speed_factor = min(speed_factor * 0.8, 1.2) # 限制视频变速范围
adjusted_video = video.fx(lambda clip: clip.speedx(video_speed_factor))
else:
adjusted_video = video
# 合成最终视频
final_video = adjusted_video.set_audio(adjusted_audio)
output_path = f"synced_{os.path.basename(video_path)}"
final_video.write_videofile(output_path, audio_codec='aac')
return output_path