Skip to content

Commit ff009a3

Browse files
Add files via upload
1 parent a6f0ff0 commit ff009a3

File tree

3 files changed

+858
-0
lines changed

3 files changed

+858
-0
lines changed

ModeloIABrazoRobótico.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
import os
2+
import tempfile
3+
import wave
4+
from gtts import gTTS
5+
import pygame
6+
import pyaudio
7+
import keyboard
8+
import cohere
9+
from time import sleep
10+
import requests
11+
from groq import Groq
12+
import ast
13+
import threading
14+
import json
15+
import ast
16+
17+
18+
NAME_TO_ID = {
19+
"base": "S5",
20+
"shoulder": "S4",
21+
"elbow": "S3",
22+
"wrist": "S2",
23+
"gripper": "S1",
24+
"clamp": "S1"
25+
}
26+
27+
def ensure_audio_folder(folder= "ai_audios"):
28+
# Checks if there's an existing folder to save the audios of the AI. If not, then creates one. Returns the path.
29+
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), folder)
30+
os.makedirs(path, exist_ok=True)
31+
return path
32+
33+
def ensure_sequence_file(file= "arm_sequences.json"):
34+
# Checks if there's an existing file to save the sequences of moves. If not, then creates one. Returns the path.
35+
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file)
36+
if not os.path.exists(path):
37+
with open(path, "w") as f:
38+
json.dump({}, f) # Start with empty dict
39+
return path
40+
41+
def ensure_log_file(file= "ai_register.txt"):
42+
# Checks if there's an existing file to save the logs of the AI. If not, then creates one. Returns the path.
43+
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file)
44+
if not os.path.exists(path):
45+
with open(path, "w") as f:
46+
f.write("") # Create empty log
47+
return path
48+
49+
Audios_Folder= ensure_audio_folder()
50+
file_path= ensure_log_file()
51+
52+
def play_audio_async(audio_path):
53+
if os.path.exists(audio_path):
54+
pygame.mixer.music.load(audio_path)
55+
pygame.mixer.music.play()
56+
while pygame.mixer.music.get_busy():
57+
sleep(0.1)
58+
pygame.mixer.music.stop()
59+
else:
60+
print("The path doesn't exist.")
61+
62+
63+
def translate(text, language_from= "es", language_to= "en"):
64+
# Translate from one language to another. Returns the transcription.
65+
url = "https://api.mymemory.translated.net/get"
66+
params = {
67+
"q": text,
68+
"langpair": f"{language_from}|{language_to}" # Traduces from Spanish (es) to English (en). Can be changed.
69+
}
70+
try:
71+
response = requests.get(url, params=params)
72+
response.raise_for_status() # Raises an error if the traduction fails.
73+
data = response.json()
74+
traduction = data["responseData"]["translatedText"]
75+
return traduction
76+
except Exception as e:
77+
print(f"Error while translating: {str(e)}")
78+
return None
79+
80+
pygame.mixer.init()
81+
client= Groq(api_key="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") # You must introduce your own API keys.
82+
api_key_llm = "1xZvuPLYfZeNnYQxk365ygyMYjUDK3O8vzs5fhgW"
83+
co = cohere.Client(api_key_llm)
84+
85+
def speak(text, language= "en"):
86+
# Says loudly the text given. The language can be changed. It doesn't return anything.
87+
files = os.listdir(Audios_Folder)
88+
nameFile = "Record" + str(len(files)) + ".mp3"
89+
audio_path = os.path.join(Audios_Folder, nameFile)
90+
tts = gTTS(text, lang= language) # Possible values: 'es', 'en', 'ca', 'fr', 'de', etc.
91+
tts.save(audio_path)
92+
93+
audio_thread = threading.Thread(target=play_audio_async, args=(audio_path,))
94+
audio_thread.daemon = True # The voice will shut down if the main program stops.
95+
audio_thread.start()
96+
97+
98+
def record_audio_ins(sample_frequency=1600, canals=1, fragment=1024):
99+
# Record the audio while pressing "INS", and saves it when releasing the button. Return the frames and the sample_frequency.
100+
p= pyaudio.PyAudio()
101+
stream= p.open(
102+
format=pyaudio.paInt16,
103+
channels=canals,
104+
rate=sample_frequency,
105+
input=True,
106+
frames_per_buffer=fragment)
107+
print("Press and hold INS to record")
108+
frames= []
109+
keyboard.wait("insert")
110+
print("Recording...")
111+
while keyboard.is_pressed("insert"):
112+
data= stream.read(fragment)
113+
frames.append(data)
114+
print("Recording finished.")
115+
stream.stop_stream()
116+
stream.close()
117+
p.terminate()
118+
return frames, sample_frequency
119+
120+
def save_record(frames, sample_frequency):
121+
# Takes the returned variables of the record_audio_ins() function and saves them as an audio. Returns the name of the audio.
122+
with tempfile.NamedTemporaryFile(suffix= ".wav", delete= False) as audio_temp:
123+
wf= wave.open(audio_temp.name, mode= "wb")
124+
wf.setnchannels(1)
125+
wf.setsampwidth(pyaudio.PyAudio().get_sample_size(pyaudio.paInt16))
126+
wf.setframerate(sample_frequency)
127+
wf.writeframes(b"".join(frames))
128+
wf.close()
129+
return audio_temp.name
130+
131+
def transcribe_audio(Path, language= "en"):
132+
# Passes the audio to text, returns the transcription.
133+
try:
134+
with open(Path, "rb") as file:
135+
transcription= client.audio.transcriptions.create(
136+
file= (os.path.basename(Path), file.read()),
137+
model= "whisper-large-v3", # May be changed
138+
prompt= "The audio is from someone commanding a robot arm with servos and degrees",
139+
response_format= "text",
140+
language=language) # You can change the language
141+
return transcription
142+
except Exception as e:
143+
print(f"An error took place: {str(e)}")
144+
return None
145+
146+
def ask(task, maxTokens= 250):
147+
# Ask an online AI. Returns the answer.
148+
response = co.generate(
149+
model='command-xlarge', # You can change the model
150+
prompt=task,
151+
max_tokens= maxTokens)
152+
return response.generations[0].text
153+
154+
def register_info(user_text, ai_answer):
155+
# Registers the AI answer in the file ai_register.txt
156+
content = f"Input: {user_text} --> --> Response: {ai_answer}\n\n"
157+
try:
158+
with open(file_path, "a", encoding="utf-8") as file:
159+
file.write(content)
160+
print("Register added correctly.")
161+
return ai_answer
162+
except Exception as e:
163+
print(f"Error while registering the ai response: {str(e)}")
164+
165+
def change_command_form(english_name):
166+
return NAME_TO_ID.get(english_name.lower(), "S1")
167+
168+
169+
def main_write(user_text):
170+
# Ask a local AI and returns it answer. You can change the AI to online by changing the function ask_local() --> ask()
171+
answer= ask_local(translate(user_text) + ", Generate a Python list containing tuples with servo positions (in degrees only) and servo names. Format each tuple as (ServoPositionDegrees, ServoName). Use these English servo names: base, shoulder, elbow, wrist, gripper. Extract only the position values that appear in this message. If you cannot find any position values, return an empty list []. return ONLY the list, don't say nothing more, if you give any background, the app breakes.")
172+
register_info(user_text, answer)
173+
answer= main(answer)
174+
if answer:
175+
return answer
176+
177+
178+
def main_record():
179+
# Records, translates, and asks a local AI. Returns it answer. You can change the AI to online by changing the function ask_local() --> ask()
180+
frames, sample_frequency= record_audio_ins()
181+
archivo_audio_temp= save_record(frames, sample_frequency)
182+
print("Transcribiendo...")
183+
transcription= transcribe_audio(archivo_audio_temp)
184+
if transcription:
185+
answer= ask_local(translate(transcription) + ", Generate a Python list containing tuples with servo positions (in degrees only) and servo names. Format each tuple as (ServoPositionDegrees, ServoName). Use these English servo names: base, shoulder, elbow, wrist, gripper. Extract only the position values that appear in this message, never invent information, and if you cannot find any valid information, return an empty list []. return ONLY the list, if you send text, the app won't work.")
186+
register_info(transcription, answer)
187+
return main(answer)
188+
else:
189+
print("The transcription failed")
190+
191+
192+
def main(ai_answer):
193+
# Checks if the answer of the AI is valid, and transform an input like "[(S1:50), (S3:130)]" to an output like [(S1:50), (S3:130)]
194+
try:
195+
answer = []
196+
ai_answer = ast.literal_eval(ai_answer)
197+
print("AI Answer:", ai_answer)
198+
199+
if not isinstance(ai_answer, list):
200+
print("ERROR: AI response isn't a list")
201+
return False
202+
203+
for item in ai_answer:
204+
if not isinstance(item, (tuple, list)) or len(item) != 2:
205+
print(f"Invalid item: {item}")
206+
return False
207+
208+
pos_grads, nom_servo = item
209+
210+
try:
211+
pos_grads = int(pos_grads)
212+
except (ValueError, TypeError):
213+
print(f"Invalid value: {pos_grads}")
214+
return False
215+
216+
nom_servo = nom_servo.lower()
217+
218+
# Validar nombre y rango
219+
if (nom_servo not in NAME_TO_ID or
220+
not (0 <= pos_grads <= 180)):
221+
print(f"Error: {nom_servo} o {pos_grads}° no valids")
222+
return False
223+
224+
answer.append((nom_servo, pos_grads))
225+
226+
return answer if answer else False
227+
228+
except SyntaxError as e:
229+
print(f"Error in the AI response: {e}")
230+
return False
231+
232+
233+
def ask_local(task):
234+
# Ask a local AI (with Ollama) and returns its answer.
235+
url = "http://localhost:11434/api/generate" # Add your lokalhost
236+
data = {
237+
"model": "mistral", # Examples (you've got to install Ollama and the models): llama3, llama3.2:1b, mistral
238+
"prompt": task,
239+
"stream": False
240+
}
241+
# Hacer la solicitud
242+
response = requests.post(url, json=data)
243+
return response.json()["response"]

0 commit comments

Comments
 (0)