from open_webui.routers.images import ( get_image_data, upload_image, ) from fastapi import ( APIRouter, Depends, HTTPException, Request, UploadFile, ) from typing import Optional from pathlib import Path from open_webui.storage.provider import Storage from open_webui.models.chats import Chats from open_webui.models.files import Files from open_webui.routers.files import upload_file_handler from open_webui.retrieval.web.utils import validate_url import mimetypes import base64 import io import re import requests BASE64_IMAGE_URL_PREFIX = re.compile(r'data:image/\w+;base64,', re.IGNORECASE) MARKDOWN_IMAGE_URL_PATTERN = re.compile(r'!\[(.*?)\]\((.+?)\)', re.IGNORECASE) def get_image_base64_from_url(url: str) -> Optional[str]: try: if url.startswith('http'): # Validate URL to prevent SSRF attacks against local/private networks validate_url(url) # Download the image from the URL response = requests.get(url) response.raise_for_status() image_data = response.content encoded_string = base64.b64encode(image_data).decode('utf-8') content_type = response.headers.get('Content-Type', 'image/png') return f'data:{content_type};base64,{encoded_string}' else: file = Files.get_file_by_id(url) if not file: return None file_path = Storage.get_file(file.path) file_path = Path(file_path) if file_path.is_file(): with open(file_path, 'rb') as image_file: encoded_string = base64.b64encode(image_file.read()).decode('utf-8') content_type, _ = mimetypes.guess_type(file_path.name) return f'data:{content_type};base64,{encoded_string}' else: return None except Exception as e: return None def get_image_url_from_base64(request, base64_image_string, metadata, user): if BASE64_IMAGE_URL_PREFIX.match(base64_image_string): image_url = '' # Extract base64 image data from the line image_data, content_type = get_image_data(base64_image_string) if image_data is not None: _, image_url = upload_image( request, image_data, content_type, metadata, user, ) return image_url return None def convert_markdown_base64_images(request, content: str, metadata, user): def replace(match): base64_string = match.group(2) MIN_REPLACEMENT_URL_LENGTH = 1024 if len(base64_string) > MIN_REPLACEMENT_URL_LENGTH: url = get_image_url_from_base64(request, base64_string, metadata, user) if url: return f'![{match.group(1)}]({url})' return match.group(0) return MARKDOWN_IMAGE_URL_PATTERN.sub(replace, content) def load_b64_audio_data(b64_str): try: if ',' in b64_str: header, b64_data = b64_str.split(',', 1) else: b64_data = b64_str header = 'data:audio/wav;base64' audio_data = base64.b64decode(b64_data) content_type = header.split(';')[0].split(':')[1] if ';' in header else 'audio/wav' return audio_data, content_type except Exception as e: print(f'Error decoding base64 audio data: {e}') return None, None def upload_audio(request, audio_data, content_type, metadata, user): audio_format = mimetypes.guess_extension(content_type) file = UploadFile( file=io.BytesIO(audio_data), filename=f'generated-{audio_format}', # will be converted to a unique ID on upload_file headers={ 'content-type': content_type, }, ) file_item = upload_file_handler( request, file=file, metadata=metadata, process=False, user=user, ) url = request.app.url_path_for('get_file_content_by_id', id=file_item.id) return url def get_audio_url_from_base64(request, base64_audio_string, metadata, user): if 'data:audio/wav;base64' in base64_audio_string: audio_url = '' # Extract base64 audio data from the line audio_data, content_type = load_b64_audio_data(base64_audio_string) if audio_data is not None: audio_url = upload_audio( request, audio_data, content_type, metadata, user, ) return audio_url return None def get_file_url_from_base64(request, base64_file_string, metadata, user): if 'data:image/png;base64' in base64_file_string: return get_image_url_from_base64(request, base64_file_string, metadata, user) elif 'data:audio/wav;base64' in base64_file_string: return get_audio_url_from_base64(request, base64_file_string, metadata, user) return None def get_image_base64_from_file_id(id: str) -> Optional[str]: file = Files.get_file_by_id(id) if not file: return None try: file_path = Storage.get_file(file.path) file_path = Path(file_path) # Check if the file already exists in the cache if file_path.is_file(): import base64 with open(file_path, 'rb') as image_file: encoded_string = base64.b64encode(image_file.read()).decode('utf-8') content_type, _ = mimetypes.guess_type(file_path.name) return f'data:{content_type};base64,{encoded_string}' else: return None except Exception as e: return None