1+ import os
2+ import tempfile
3+ import wave
4+ from gtts import gTTS
5+ import pygame
6+ import pyaudio
7+ import keyboard
8+ import cohere
9+ from time import sleep
10+ import requests
11+ from groq import Groq
12+ import ast
13+ import threading
14+ import json
15+ import ast
16+
17+
18+ NAME_TO_ID = {
19+ "base" : "S5" ,
20+ "shoulder" : "S4" ,
21+ "elbow" : "S3" ,
22+ "wrist" : "S2" ,
23+ "gripper" : "S1" ,
24+ "clamp" : "S1"
25+ }
26+
27+ def ensure_audio_folder (folder = "ai_audios" ):
28+ # Checks if there's an existing folder to save the audios of the AI. If not, then creates one. Returns the path.
29+ path = os .path .join (os .path .dirname (os .path .abspath (__file__ )), folder )
30+ os .makedirs (path , exist_ok = True )
31+ return path
32+
33+ def ensure_sequence_file (file = "arm_sequences.json" ):
34+ # Checks if there's an existing file to save the sequences of moves. If not, then creates one. Returns the path.
35+ path = os .path .join (os .path .dirname (os .path .abspath (__file__ )), file )
36+ if not os .path .exists (path ):
37+ with open (path , "w" ) as f :
38+ json .dump ({}, f ) # Start with empty dict
39+ return path
40+
41+ def ensure_log_file (file = "ai_register.txt" ):
42+ # Checks if there's an existing file to save the logs of the AI. If not, then creates one. Returns the path.
43+ path = os .path .join (os .path .dirname (os .path .abspath (__file__ )), file )
44+ if not os .path .exists (path ):
45+ with open (path , "w" ) as f :
46+ f .write ("" ) # Create empty log
47+ return path
48+
49+ Audios_Folder = ensure_audio_folder ()
50+ file_path = ensure_log_file ()
51+
52+ def play_audio_async (audio_path ):
53+ if os .path .exists (audio_path ):
54+ pygame .mixer .music .load (audio_path )
55+ pygame .mixer .music .play ()
56+ while pygame .mixer .music .get_busy ():
57+ sleep (0.1 )
58+ pygame .mixer .music .stop ()
59+ else :
60+ print ("The path doesn't exist." )
61+
62+
63+ def translate (text , language_from = "es" , language_to = "en" ):
64+ # Translate from one language to another. Returns the transcription.
65+ url = "https://api.mymemory.translated.net/get"
66+ params = {
67+ "q" : text ,
68+ "langpair" : f"{ language_from } |{ language_to } " # Traduces from Spanish (es) to English (en). Can be changed.
69+ }
70+ try :
71+ response = requests .get (url , params = params )
72+ response .raise_for_status () # Raises an error if the traduction fails.
73+ data = response .json ()
74+ traduction = data ["responseData" ]["translatedText" ]
75+ return traduction
76+ except Exception as e :
77+ print (f"Error while translating: { str (e )} " )
78+ return None
79+
80+ pygame .mixer .init ()
81+ client = Groq (api_key = "Here goes your api key" ) # You must introduce your own API keys.
82+ api_key_llm = "Here goes your api key"
83+ co = cohere .Client (api_key_llm )
84+
85+ def speak (text , language = "en" ):
86+ # Says loudly the text given. The language can be changed. It doesn't return anything.
87+ files = os .listdir (Audios_Folder )
88+ nameFile = "Record" + str (len (files )) + ".mp3"
89+ audio_path = os .path .join (Audios_Folder , nameFile )
90+ tts = gTTS (text , lang = language ) # Possible values: 'es', 'en', 'ca', 'fr', 'de', etc.
91+ tts .save (audio_path )
92+
93+ audio_thread = threading .Thread (target = play_audio_async , args = (audio_path ,))
94+ audio_thread .daemon = True # The voice will shut down if the main program stops.
95+ audio_thread .start ()
96+
97+
98+ def record_audio_ins (sample_frequency = 1600 , canals = 1 , fragment = 1024 ):
99+ # Record the audio while pressing "INS", and saves it when releasing the button. Return the frames and the sample_frequency.
100+ p = pyaudio .PyAudio ()
101+ stream = p .open (
102+ format = pyaudio .paInt16 ,
103+ channels = canals ,
104+ rate = sample_frequency ,
105+ input = True ,
106+ frames_per_buffer = fragment )
107+ print ("Press and hold INS to record" )
108+ frames = []
109+ keyboard .wait ("insert" )
110+ print ("Recording..." )
111+ while keyboard .is_pressed ("insert" ):
112+ data = stream .read (fragment )
113+ frames .append (data )
114+ print ("Recording finished." )
115+ stream .stop_stream ()
116+ stream .close ()
117+ p .terminate ()
118+ return frames , sample_frequency
119+
120+ def save_record (frames , sample_frequency ):
121+ # Takes the returned variables of the record_audio_ins() function and saves them as an audio. Returns the name of the audio.
122+ with tempfile .NamedTemporaryFile (suffix = ".wav" , delete = False ) as audio_temp :
123+ wf = wave .open (audio_temp .name , mode = "wb" )
124+ wf .setnchannels (1 )
125+ wf .setsampwidth (pyaudio .PyAudio ().get_sample_size (pyaudio .paInt16 ))
126+ wf .setframerate (sample_frequency )
127+ wf .writeframes (b"" .join (frames ))
128+ wf .close ()
129+ return audio_temp .name
130+
131+ def transcribe_audio (Path , language = "en" ):
132+ # Passes the audio to text, returns the transcription.
133+ try :
134+ with open (Path , "rb" ) as file :
135+ transcription = client .audio .transcriptions .create (
136+ file = (os .path .basename (Path ), file .read ()),
137+ model = "whisper-large-v3" , # May be changed
138+ prompt = "The audio is from someone commanding a robot arm with servos and degrees" ,
139+ response_format = "text" ,
140+ language = language ) # You can change the language
141+ return transcription
142+ except Exception as e :
143+ print (f"An error took place: { str (e )} " )
144+ return None
145+
146+ def ask (task , maxTokens = 250 ):
147+ # Ask an online AI. Returns the answer.
148+ response = co .generate (
149+ model = 'command-xlarge' , # You can change the model
150+ prompt = task ,
151+ max_tokens = maxTokens )
152+ return response .generations [0 ].text
153+
154+ def register_info (user_text , ai_answer ):
155+ # Registers the AI answer in the file ai_register.txt
156+ content = f"Input: { user_text } --> --> Response: { ai_answer } \n \n "
157+ try :
158+ with open (file_path , "a" , encoding = "utf-8" ) as file :
159+ file .write (content )
160+ print ("Register added correctly." )
161+ return ai_answer
162+ except Exception as e :
163+ print (f"Error while registering the ai response: { str (e )} " )
164+
165+ def change_command_form (english_name ):
166+ return NAME_TO_ID .get (english_name .lower (), "S1" )
167+
168+
169+ def main_write (user_text ):
170+ # Ask a local AI and returns it answer. You can change the AI to online by changing the function ask_local() --> ask()
171+ answer = ask_local (translate (user_text ) + ", Generate a Python list containing tuples with servo positions (in degrees only) and servo names. Format each tuple as (ServoPositionDegrees, ServoName). Use these English servo names: base, shoulder, elbow, wrist, gripper. Extract only the position values that appear in this message. If you cannot find any position values, return an empty list []. return ONLY the list, don't say nothing more, if you give any background, the app breakes." )
172+ register_info (user_text , answer )
173+ answer = main (answer )
174+ if answer :
175+ return answer
176+
177+
178+ def main_record ():
179+ # Records, translates, and asks a local AI. Returns it answer. You can change the AI to online by changing the function ask_local() --> ask()
180+ frames , sample_frequency = record_audio_ins ()
181+ archivo_audio_temp = save_record (frames , sample_frequency )
182+ print ("Transcribiendo..." )
183+ transcription = transcribe_audio (archivo_audio_temp )
184+ if transcription :
185+ answer = ask_local (translate (transcription ) + ", Generate a Python list containing tuples with servo positions (in degrees only) and servo names. Format each tuple as (ServoPositionDegrees, ServoName). Use these English servo names: base, shoulder, elbow, wrist, gripper. Extract only the position values that appear in this message, never invent information, and if you cannot find any valid information, return an empty list []. return ONLY the list, if you send text, the app won't work." )
186+ register_info (transcription , answer )
187+ return main (answer )
188+ else :
189+ print ("The transcription failed" )
190+
191+
192+ def main (ai_answer ):
193+ # Checks if the answer of the AI is valid, and transform an input like "[(S1:50), (S3:130)]" to an output like [(S1:50), (S3:130)]
194+ try :
195+ answer = []
196+ ai_answer = ast .literal_eval (ai_answer )
197+ print ("AI Answer:" , ai_answer )
198+
199+ if not isinstance (ai_answer , list ):
200+ print ("ERROR: AI response isn't a list" )
201+ return False
202+
203+ for item in ai_answer :
204+ if not isinstance (item , (tuple , list )) or len (item ) != 2 :
205+ print (f"Invalid item: { item } " )
206+ return False
207+
208+ pos_grads , nom_servo = item
209+
210+ try :
211+ pos_grads = int (pos_grads )
212+ except (ValueError , TypeError ):
213+ print (f"Invalid value: { pos_grads } " )
214+ return False
215+
216+ nom_servo = nom_servo .lower ()
217+
218+ # Validar nombre y rango
219+ if (nom_servo not in NAME_TO_ID or
220+ not (0 <= pos_grads <= 180 )):
221+ print (f"Error: { nom_servo } o { pos_grads } ° no valids" )
222+ return False
223+
224+ answer .append ((nom_servo , pos_grads ))
225+
226+ return answer if answer else False
227+
228+ except SyntaxError as e :
229+ print (f"Error in the AI response: { e } " )
230+ return False
231+
232+
233+ def ask_local (task ):
234+ # Ask a local AI (with Ollama) and returns its answer.
235+ url = "http://localhost:-----/api/generate" # Add your lokalhost
236+ data = {
237+ "model" : "mistral" , # Examples (you've got to install Ollama and the models): llama3, llama3.2:1b, mistral
238+ "prompt" : task ,
239+ "stream" : False
240+ }
241+ # Hacer la solicitud
242+ response = requests .post (url , json = data )
243+ return response .json ()["response" ]
0 commit comments