From 173ed906422a466416376dbbfa2a6ea9e7cc08a3 Mon Sep 17 00:00:00 2001 From: Azeufack Noupeu Willy Date: Mon, 24 Nov 2025 19:36:50 +0100 Subject: [PATCH] feat(S3-06): integrate orchestrator in main.js and complete assembly module --- main.js | 67 +++++++++ package.json | 1 + .../modules/transcription-remote/assembly.js | 134 +++++++++++++++++- 3 files changed, 195 insertions(+), 7 deletions(-) diff --git a/main.js b/main.js index 1482f06..b87afea 100644 --- a/main.js +++ b/main.js @@ -33,6 +33,73 @@ console.log(`${mapFunctions.size} Function modules loaded`); console.log("--------------------------------------------------------------------------------"); +// ======================== S3-06 : PIPELINE ORCHESTRATOR ======================== +// Get audioEvents from ffmpegExtractor module +const ffmpegExtractor = mapFunctions.get("extraction-video-to-audio"); +const audioEvents = ffmpegExtractor.audioEvents; + +console.log('🎯 [S3-06] Pipeline Orchestrator ready. Listening for audio_ready events...'); + +audioEvents.on('audio_ready', async (data) => { + const { audioPath, sessionId } = data; + + console.log(`✅ [Pipeline] Audio ready: ${sessionId}`); + console.log(`📁 Audio path: ${audioPath}`); + + // Send status to UI + if (mainWindow) { + mainWindow.webContents.send('pipeline-status', { + sessionId, + status: 'audio_ready', + message: 'Audio bereit' + }); + } + + try { + console.log(`🚀 [Pipeline] Starting transcription: ${sessionId}`); + + if (mainWindow) { + mainWindow.webContents.send('pipeline-status', { + sessionId, + status: 'transcription_started', + message: 'Transkription gestartet' + }); + } + + // Get transcription module + const assemblyModule = mapFunctions.get("assembly"); + + if (assemblyModule && assemblyModule.run) { + await assemblyModule.run(audioPath); + } else { + console.warn('⚠️ Assembly module not found or missing run function'); + } + + console.log(`✅ [Pipeline] Transcription completed: ${sessionId}`); + + if (mainWindow) { + mainWindow.webContents.send('pipeline-status', { + sessionId, + status: 'transcription_completed', + message: 'Transkription abgeschlossen' + }); + } + + } catch (error) { + console.error(`❌ [Pipeline] Transcription failed: ${sessionId}`); + console.error(` Error:`, error.message); + + if (mainWindow) { + mainWindow.webContents.send('pipeline-status', { + sessionId, + status: 'failed_transcription_start', + message: 'Fehler beim Transkriptionsstart', + error: error.message + }); + } + } +}); +// =============================================================================== // --------------------------------------------------------- CLI COMMANDS --------------------------------------------------------- // diff --git a/package.json b/package.json index f85f30e..3db9751 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,7 @@ { "dependencies": { "@types/axios": "^0.9.36", + "axios": "^1.13.2", "cli-progress": "^3.12.0", "dotenv": "^17.2.3", "electron": "^39.1.1", diff --git a/services/modules/transcription-remote/assembly.js b/services/modules/transcription-remote/assembly.js index 3b28dad..6f6b2f2 100644 --- a/services/modules/transcription-remote/assembly.js +++ b/services/modules/transcription-remote/assembly.js @@ -1,8 +1,128 @@ -module.exports = { - name:"assembly", // Unique name for our function that will later be used to get the function from the map via "mapFunctions.get("example").function()" - type:"transcription", // value used to differentiate each module to order them in the UI - displayname:"Assembly", // The displayname used within the UI - async function(parameter){ - // TODO add code to actually process the audio file +require('dotenv/config'); +const axios = require('axios'); +const fs = require('fs'); +const path = require('path'); + +const API_KEY = process.env.ASSEMBLYAI_API_KEY; +const BASE_URL = 'https://api.assemblyai.com/v2'; + +/** + * Uploads audio file to AssemblyAI + */ +async function uploadAudio(audioPath) { + const audioData = fs.readFileSync(audioPath); + + const response = await axios.post(`${BASE_URL}/upload`, audioData, { + headers: { + 'authorization': API_KEY, + 'content-type': 'application/octet-stream' } -} \ No newline at end of file + }); + + return response.data.upload_url; +} + +/** + * Extract session id from path or URL + */ +function getSessionId(inputPath) { + try { + const parsed = new URL(inputPath); + const base = path.basename(parsed.pathname); + return base.replace(/\.[^.]+$/, ''); + } catch (err) { + return path.basename(inputPath, path.extname(inputPath)); + } +} + +/** + * Creates transcription job with speaker diarization + */ +async function createTranscript(audioUrl) { + const response = await axios.post(`${BASE_URL}/transcript`, { + audio_url: audioUrl, + speaker_labels: true, + language_detection: true + }, { + headers: { + 'authorization': API_KEY, + 'content-type': 'application/json' + } + }); + + return response.data.id; +} + +/** + * Polls transcript status until completed + */ +async function pollTranscript(transcriptId) { + while (true) { + const response = await axios.get(`${BASE_URL}/transcript/${transcriptId}`, { + headers: { 'authorization': API_KEY } + }); + + const status = response.data.status; + + if (status === 'completed') { + return response.data; + } else if (status === 'error') { + throw new Error(`Transcription failed: ${response.data.error}`); + } + + // Wait 3 seconds before next poll + await new Promise(resolve => setTimeout(resolve, 3000)); + } +} + +/** + * Saves transcript to storage + */ +function saveTranscript(transcript, sessionId) { + const outputDir = path.join(__dirname, '..', '..', '..', 'storage', 'transcripts'); + + if (!fs.existsSync(outputDir)) { + fs.mkdirSync(outputDir, { recursive: true }); + } + + const outputPath = path.join(outputDir, `${sessionId}.json`); + fs.writeFileSync(outputPath, JSON.stringify(transcript, null, 2)); + + console.log(`✅ Transcript saved: ${outputPath}`); +} + +module.exports = { + name: "assembly", + type: "transcription", + displayname: "AssemblyAI", + run: async function(audioPath) { + try { + // Determine if audioPath is an external URL or a local file + let audioUrl; + if (/^https?:\/\//i.test(audioPath)) { + console.log('🔗 Using external audio URL...'); + audioUrl = audioPath; + } else { + if (!fs.existsSync(audioPath)) { + throw new Error(`Audio file not found: ${audioPath}`); + } + console.log('📤 Uploading audio file...'); + audioUrl = await uploadAudio(audioPath); + } + + console.log('🔄 Creating transcript job...'); + const transcriptId = await createTranscript(audioUrl); + + console.log('⏳ Waiting for transcription...'); + const transcript = await pollTranscript(transcriptId); + + const sessionId = getSessionId(audioPath); + saveTranscript(transcript, sessionId); + + return transcript; + } catch (error) { + console.error('❌ Transcription error:', error.message); + throw error; + } + } +}; \ No newline at end of file