#!/usr/bin/env python3 """ daily_scheduler.py — Animily Music v2 메인 오케스트레이터 매일 00:30 실행: 1. Precache (Claude Code CLI) → 2 tracks JSON 2. ACE-Step으로 음악 생성 3. FLUX로 이미지 생성 4. ffmpeg으로 영상 렌더 5. Upload queue에 저장 6. GPU 프로세스 정리 (03:00 auto_shorts 전에 완료 필수) Cron: 30 0 * * * cd /home/javamon/project/animily_music && venv/bin/python3 daily_scheduler.py """ import os import sys import json import time import uuid import shutil import signal import logging import subprocess import requests from datetime import datetime, date, timedelta from pathlib import Path # Project paths PROJECT_DIR = Path("/home/javamon/project/animily_music") OUTPUT_DIR = PROJECT_DIR / "outputs" QUEUE_DIR = PROJECT_DIR / "upload_queue" UPLOAD_QUEUE_PATH = PROJECT_DIR / "upload_queue.json" PRECACHE_DIR = PROJECT_DIR / "precache" LOG_DIR = PROJECT_DIR / "logs" # API endpoints ACESTEP_URL = "http://localhost:8001" COMFYUI_URL = "http://localhost:8189" # Timeouts ACESTEP_TIMEOUT = 600 # 10 min per track COMFYUI_TIMEOUT = 900 # 15 min per image (FLUX needs ~10min) PIPELINE_DEADLINE = 8100 # 2h15m total (must finish before 03:00) # Logging LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_DIR / "daily.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ============================================================ # GPU / Service Management # ============================================================ def check_health(url: str, endpoint: str = "/health") -> bool: """Check if a service is healthy.""" try: r = requests.get(f"{url}{endpoint}", timeout=5) return r.status_code == 200 except: return False def start_acestep(): """Start ACE-Step server if not running.""" if check_health(ACESTEP_URL, "/health"): logger.info("ACE-Step already running") return True logger.info("Starting ACE-Step server...") subprocess.Popen( ["bash", "-c", "cd /home/javamon/ACE-Step-1.5 && source venv/bin/activate && python -m uvicorn acestep.api_server:app --host 0.0.0.0 --port 8001 --workers 1"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # Wait for startup for i in range(60): time.sleep(5) if check_health(ACESTEP_URL, "/health"): logger.info(f"ACE-Step ready after {(i+1)*5}s") return True logger.error("ACE-Step failed to start within 5 minutes") return False def start_comfyui(): """Start ComfyUI for FLUX image generation.""" if check_health(COMFYUI_URL, "/"): logger.info("ComfyUI already running") return True # Stop TTS service to free GPU memory logger.info("Stopping Qwen-TTS to free memory for FLUX...") subprocess.run(["sudo", "systemctl", "stop", "qwen-tts.service"], capture_output=True) subprocess.run(["sudo", "systemctl", "disable", "qwen-tts.service"], capture_output=True) time.sleep(3) logger.info("Starting ComfyUI with --disable-mmap...") subprocess.Popen( ["bash", "-c", "cd /home/javamon/ComfyUI && ./venv/bin/python main.py " "--listen 0.0.0.0 --port 8189 --disable-mmap &"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) for i in range(60): time.sleep(5) if check_health(COMFYUI_URL, "/"): logger.info(f"ComfyUI ready after {(i+1)*5}s") return True logger.error("ComfyUI failed to start within 5 minutes") return False def kill_gpu_processes(): """Kill all GPU processes (ACE-Step, ComfyUI) and clean up.""" logger.info("=== GPU Cleanup Started ===") # Kill ACE-Step subprocess.run(["fuser", "-k", "8001/tcp"], capture_output=True) subprocess.run(["pkill", "-9", "-f", "acestep.api_server"], capture_output=True) # Kill ComfyUI subprocess.run(["pkill", "-f", "ComfyUI"], capture_output=True) subprocess.run(["pkill", "-f", "comfyui"], capture_output=True) time.sleep(3) # Force kill if still alive subprocess.run(["pkill", "-9", "-f", "acestep.api_server"], capture_output=True) subprocess.run(["pkill", "-9", "-f", "ComfyUI"], capture_output=True) # Re-enable TTS service logger.info("Re-enabling Qwen-TTS service...") subprocess.run(["sudo", "systemctl", "enable", "qwen-tts.service"], capture_output=True) subprocess.run(["sudo", "systemctl", "start", "qwen-tts.service"], capture_output=True) # Drop caches subprocess.run(["sudo", "bash", "-c", "echo 3 > /proc/sys/vm/drop_caches"], capture_output=True) logger.info("=== GPU Cleanup Complete ===") # ============================================================ # Music Generation (ACE-Step) # ============================================================ def generate_music(track: dict, output_path: Path) -> bool: """Generate music using ACE-Step API.""" logger.info(f"Generating music: {track['id']} ({track['duration']}s, {track['bpm']}bpm)") payload = { "caption": track["caption"], "lyrics": track["lyrics"], "bpm": track["bpm"], "key_scale": track["key_scale"], "time_signature": track["time_signature"], "audio_duration": track["duration"], "batch_size": 1, "thinking": True, "inference_steps": 8, "guidance_scale": 7.0, "audio_format": "wav" } try: # Submit task r = requests.post(f"{ACESTEP_URL}/release_task", json=payload, timeout=30) if r.status_code != 200: logger.error(f"ACE-Step submit failed: {r.status_code} {r.text[:200]}") return False result = r.json() # ACE-Step returns {"data": {"task_id": "...", "status": "queued"}, "code": 200} task_id = result.get("data", {}).get("task_id") if not task_id: logger.error(f"No task_id in response: {result}") return False logger.info(f"Task submitted: {task_id}") # Poll for completion start_time = time.time() while time.time() - start_time < ACESTEP_TIMEOUT: time.sleep(10) try: poll_r = requests.post( f"{ACESTEP_URL}/query_result", json={"task_id_list": [task_id]}, timeout=15 ) if poll_r.status_code != 200: continue items = poll_r.json().get("data", []) if not items: continue item = items[0] if isinstance(items, list) else items status = item.get("status") if status == 1: # completed import json as _json result_str = item.get("result", "[]") try: result_list = _json.loads(result_str) if isinstance(result_str, str) else result_str except: result_list = [] if result_list and isinstance(result_list, list): file_url = result_list[0].get("file", "") if file_url: return download_audio(file_url, output_path) logger.error(f"Completed but no audio: {item}") return False elif status == 2: # failed logger.error(f"Task failed: {item}") return False # status == 0: still running except requests.RequestException: pass elapsed = int(time.time() - start_time) if elapsed % 60 == 0: logger.info(f" Waiting... ({elapsed}s elapsed)") logger.error(f"ACE-Step timeout after {ACESTEP_TIMEOUT}s") return False except Exception as e: logger.error(f"Music generation error: {e}") return False def download_audio(file_url: str, output_path: Path) -> bool: """Download audio from ACE-Step /v1/audio endpoint.""" import urllib.parse if file_url.startswith("/"): download_url = f"{ACESTEP_URL}{file_url}" elif file_url.startswith("http"): download_url = file_url else: download_url = f"{ACESTEP_URL}/v1/audio?path={urllib.parse.quote(file_url)}" try: r = requests.get(download_url, timeout=120) r.raise_for_status() with open(output_path, "wb") as f: f.write(r.content) size_kb = os.path.getsize(output_path) // 1024 if size_kb < 10: logger.error(f"Downloaded file too small: {size_kb}KB") return False logger.info(f" Audio downloaded: {size_kb}KB") return True except Exception as e: logger.error(f"Download failed: {e}") return False def add_to_upload_queue(track_data: dict, video_path: str, thumbnail_path: str, upload_hour: int): """auto_shorts 방식 upload_queue.json에 추가""" queue = [] if UPLOAD_QUEUE_PATH.exists(): try: queue = json.loads(UPLOAD_QUEUE_PATH.read_text(encoding="utf-8")) except: queue = [] entry = { "schedule_key": track_data["id"], "upload_hour": upload_hour, "channel": "animily_music", "video_path": video_path, "thumbnail_path": thumbnail_path, "title": track_data["youtube_title"], "youtube_tags": track_data.get("youtube_tags", []), "playlist_category": track_data.get("playlist_category", ""), "created_date": datetime.now().strftime("%Y-%m-%d"), "uploaded": False, } queue.append(entry) UPLOAD_QUEUE_PATH.write_text(json.dumps(queue, ensure_ascii=False, indent=2), encoding="utf-8") logger.info(f"Queued for upload at {upload_hour}:00: {track_data['youtube_title']}") # FLUX.2 워크플로우 (1344x768, 16:9) FLUX_WORKFLOW = { "1": {"class_type": "UnetLoaderGGUF", "inputs": {"unet_name": "flux2-dev-Q8_0.gguf"}}, "2": {"class_type": "LoraLoader", "inputs": { "model": ["1", 0], "clip": ["3", 0], "lora_name": "flux2_turbo_comfy.safetensors", "strength_model": 1.0, "strength_clip": 0.0, }}, "3": {"class_type": "CLIPLoader", "inputs": { "clip_name": "mistral_3_small_flux2_bf16.safetensors", "type": "flux2", }}, "6": {"class_type": "CLIPTextEncode", "inputs": {"clip": ["3", 0], "text": ""}}, "7": {"class_type": "CLIPTextEncode", "inputs": {"clip": ["3", 0], "text": ""}}, "5": {"class_type": "EmptyLatentImage", "inputs": { "width": 1344, "height": 768, "batch_size": 1, }}, "4": {"class_type": "KSampler", "inputs": { "model": ["2", 0], "positive": ["6", 0], "negative": ["7", 0], "latent_image": ["5", 0], "seed": 0, "steps": 8, "cfg": 1.0, "sampler_name": "euler", "scheduler": "simple", "denoise": 1.0, }}, "8": {"class_type": "VAELoader", "inputs": {"vae_name": "flux2-ae.safetensors"}}, "10": {"class_type": "VAEDecodeTiled", "inputs": { "vae": ["8", 0], "samples": ["4", 0], "tile_size": 512, "overlap": 64, "temporal_size": 64, "temporal_overlap": 8, }}, "9": {"class_type": "SaveImage", "inputs": { "images": ["10", 0], "filename_prefix": "flux_music_gen", }}, } def generate_image(prompt: str, output_path: Path) -> bool: """Generate image using FLUX via ComfyUI.""" logger.info(f"Generating image: {prompt[:80]}...") workflow = json.loads(json.dumps(FLUX_WORKFLOW)) workflow["4"]["inputs"]["seed"] = int.from_bytes(os.urandom(4), "big") workflow["6"]["inputs"]["text"] = prompt workflow["7"]["inputs"]["text"] = "text, watermark, logo, signature, words, letters, blurry, low quality" try: # Queue prompt r = requests.post( f"{COMFYUI_URL}/prompt", json={"prompt": workflow}, timeout=30 ) if r.status_code != 200: logger.error(f"ComfyUI queue failed: {r.status_code} {r.text[:200]}") return False prompt_id = r.json().get("prompt_id") if not prompt_id: logger.error("No prompt_id returned") return False logger.info(f"ComfyUI prompt queued: {prompt_id}") # Poll for completion start_time = time.time() while time.time() - start_time < COMFYUI_TIMEOUT: time.sleep(5) try: hist_r = requests.get(f"{COMFYUI_URL}/history/{prompt_id}", timeout=10) if hist_r.status_code == 200: hist = hist_r.json() if prompt_id in hist: status = hist[prompt_id].get("status", {}).get("status_str", "") if status == "success": # Find output image import glob as _g pngs = sorted(_g.glob("/home/javamon/ComfyUI/output/flux_music_gen_*.png")) if pngs: import shutil shutil.copy2(pngs[-1], output_path) for p in pngs: os.remove(p) elapsed = int(time.time() - start_time) logger.info(f" Image generated in {elapsed}s") return True # Try outputs dict as fallback outputs = hist[prompt_id].get("outputs", {}) if "9" in outputs and outputs["9"].get("images"): img_info = outputs["9"]["images"][0] return download_comfyui_image(img_info, output_path) elif status == "error": logger.error("ComfyUI generation error") return False except: continue logger.error(f"ComfyUI timeout after {COMFYUI_TIMEOUT}s") return False except Exception as e: logger.error(f"Image generation error: {e}") return False def download_comfyui_image(img_info: dict, output_path: Path) -> bool: """Download image from ComfyUI.""" try: filename = img_info["filename"] subfolder = img_info.get("subfolder", "") img_type = img_info.get("type", "output") r = requests.get( f"{COMFYUI_URL}/view", params={"filename": filename, "subfolder": subfolder, "type": img_type}, timeout=30 ) if r.status_code != 200: logger.error(f"Image download failed: {r.status_code}") return False output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "wb") as f: f.write(r.content) logger.info(f"Image saved: {output_path}") return True except Exception as e: logger.error(f"Image download error: {e}") return False # ============================================================ # Video Rendering (ffmpeg) # ============================================================ def render_video(audio_path: Path, image_path: Path, output_path: Path, duration: int) -> bool: """Render video: image + Ken Burns + watermark + audio.""" logger.info(f"Rendering video: {output_path.name} ({duration}s)") output_path.parent.mkdir(parents=True, exist_ok=True) font_path = "/usr/share/fonts/truetype/ONE_Mobile_Bold.otf" # Ken Burns: slow zoom from 1.0 to 1.05 over duration filter_complex = ( f"[0:v]scale=2688:1536,zoompan=z='1+0.05*on/{duration*30}'" f":x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)'" f":d={duration*30}:s=1344x768:fps=30[v];" f"[v]drawtext=text='ANIMILY'" f":fontfile={font_path}" f":fontsize=28:fontcolor=white@0.4" f":x=w-tw-60:y=30" f":shadowcolor=black@0.3:shadowx=1:shadowy=1[vout]" ) cmd = [ "ffmpeg", "-y", "-loop", "1", "-i", str(image_path), "-i", str(audio_path), "-filter_complex", filter_complex, "-map", "[vout]", "-map", "1:a", "-c:v", "libx264", "-preset", "medium", "-crf", "23", "-c:a", "aac", "-b:a", "192k", "-t", str(duration), "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(output_path) ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: logger.error(f"ffmpeg failed: {result.stderr[-500:]}") return False size_mb = output_path.stat().st_size / (1024 * 1024) logger.info(f"Video rendered: {output_path} ({size_mb:.1f}MB)") return True except subprocess.TimeoutExpired: logger.error("ffmpeg timeout (5 min)") return False except Exception as e: logger.error(f"Render error: {e}") return False # ============================================================ # Upload Queue # ============================================================ def save_to_queue(track: dict, video_path: Path, image_path: Path, upload_time: str): """Save track to upload queue.""" QUEUE_DIR.mkdir(parents=True, exist_ok=True) queue_item = { "id": track["id"], "video_path": str(video_path), "thumbnail_path": str(image_path), "youtube_title": track["youtube_title"], "youtube_tags": track["youtube_tags"], "playlist_category": track["playlist_category"], "scheduled_time": upload_time, "status": "pending", "created_at": datetime.now().isoformat() } queue_file = QUEUE_DIR / f"{track['id']}_{datetime.now().strftime('%Y%m%d')}.json" with open(queue_file, "w", encoding="utf-8") as f: json.dump(queue_item, f, ensure_ascii=False, indent=2) logger.info(f"Queued for upload at {upload_time}: {queue_file.name}") # ============================================================ # Main Pipeline # ============================================================ def run_precache(target_date: str) -> dict: """Run daily_precache.py and return parsed data.""" logger.info("Running precache...") result = subprocess.run( [str(PROJECT_DIR / "venv/bin/python3"), str(PROJECT_DIR / "daily_precache.py"), "--date", target_date], capture_output=True, text=True, timeout=180, cwd=str(PROJECT_DIR) ) if result.returncode != 0: logger.error(f"Precache failed: {result.stderr[:500]}") return None # Load from file precache_file = PRECACHE_DIR / f"{target_date}.json" if not precache_file.exists(): logger.error(f"Precache file not found: {precache_file}") return None with open(precache_file, "r", encoding="utf-8") as f: return json.load(f) def process_track(track: dict, work_dir: Path) -> bool: """Process a single track: music → image → video → queue.""" track_id = track["id"] logger.info(f"--- Processing {track_id} ---") audio_path = work_dir / f"{track_id}.wav" image_path = work_dir / f"{track_id}.png" video_path = work_dir / f"{track_id}.mp4" # Step 1: Generate music if not generate_music(track, audio_path): logger.error(f"{track_id}: Music generation failed") return False # Step 2: Generate image if not generate_image(track["image_prompt"], image_path): logger.error(f"{track_id}: Image generation failed") return False # Step 3: Render video if not render_video(audio_path, image_path, video_path, track["duration"]): logger.error(f"{track_id}: Video render failed") return False return True def main(): pipeline_start = time.time() today = date.today() target_date = (today + timedelta(days=1)).strftime("%Y-%m-%d") logger.info("=" * 60) logger.info(f"=== Animily Music Daily Scheduler: {today} ===") logger.info(f"=== Target publish date: {target_date} ===") logger.info("=" * 60) success_count = 0 try: # Step 1: Precache precache_data = run_precache(target_date) if not precache_data: logger.error("Precache failed. Aborting.") return 1 tracks = precache_data.get("tracks", []) if not tracks: logger.error("No tracks in precache. Aborting.") return 1 logger.info(f"Theme: {precache_data.get('theme', 'N/A')}") logger.info(f"Tracks to process: {len(tracks)}") # Step 2: Start ACE-Step if not start_acestep(): logger.error("Cannot start ACE-Step. Aborting.") return 1 # Work directory work_dir = OUTPUT_DIR / target_date work_dir.mkdir(parents=True, exist_ok=True) # Step 3: Generate music for all tracks first (before switching to FLUX) audio_paths = {} for track in tracks: audio_path = work_dir / f"{track['id']}.wav" if generate_music(track, audio_path): audio_paths[track["id"]] = audio_path else: logger.warning(f"Skipping {track['id']}: music generation failed") # Check deadline if time.time() - pipeline_start > PIPELINE_DEADLINE: logger.error("Pipeline deadline exceeded!") break # Kill ACE-Step before starting FLUX logger.info("Killing ACE-Step for FLUX...") subprocess.run(["fuser", "-k", "8001/tcp"], capture_output=True) subprocess.run(["pkill", "-9", "-f", "acestep.api_server"], capture_output=True) time.sleep(5) # Step 4: Start ComfyUI for FLUX if audio_paths and not start_comfyui(): logger.error("Cannot start ComfyUI. Skipping image generation.") else: # Generate images for track in tracks: if track["id"] not in audio_paths: continue image_path = work_dir / f"{track['id']}.png" if not generate_image(track["image_prompt"], image_path): logger.warning(f"Skipping {track['id']}: image generation failed") del audio_paths[track["id"]] if time.time() - pipeline_start > PIPELINE_DEADLINE: logger.error("Pipeline deadline exceeded!") break # Kill ComfyUI before rendering logger.info("Killing ComfyUI for render phase...") subprocess.run(["pkill", "-f", "ComfyUI"], capture_output=True) time.sleep(3) # Step 5: Render videos upload_times = ["07:00", "15:00"] for i, track in enumerate(tracks): if track["id"] not in audio_paths: continue audio_path = work_dir / f"{track['id']}.wav" image_path = work_dir / f"{track['id']}.png" video_path = work_dir / f"{track['id']}.mp4" if not image_path.exists(): continue if render_video(audio_path, image_path, video_path, track["duration"]): # Save to queue upload_hour = [7, 15][i] if i < 2 else 15 add_to_upload_queue(track, str(video_path), str(image_path), upload_hour) success_count += 1 else: logger.warning(f"Skipping {track['id']}: render failed") logger.info(f"Pipeline complete: {success_count}/{len(tracks)} tracks successful") except Exception as e: logger.error(f"Pipeline error: {e}", exc_info=True) finally: # CRITICAL: Always clean up GPU processes kill_gpu_processes() elapsed = int(time.time() - pipeline_start) logger.info(f"Total time: {elapsed // 60}m {elapsed % 60}s") logger.info("=" * 60) return 0 if success_count > 0 else 1 if __name__ == "__main__": sys.exit(main())