File Uploader¶
Загрузка файлов на transfer.sh и S3-совместимые хранилища.
Возможности¶
- Загрузка на transfer.sh (бесплатно)
- Загрузка на S3/MinIO
- Прогресс загрузки
- Автоматический retry
- TTL для временных файлов
Использование¶
Transfer.sh (простая загрузка)¶
from kit.utils import FileUploader
uploader = FileUploader()
# Загрузка файла
url = await uploader.upload("video.mp4")
print(url) # "https://transfer.sh/abc123/video.mp4"
# С кастомным TTL (дни)
url = await uploader.upload("document.pdf", ttl_days=7)
S3/MinIO¶
uploader = FileUploader(
backend="s3",
s3_endpoint="https://s3.amazonaws.com",
s3_access_key="...",
s3_secret_key="...",
s3_bucket="my-bucket"
)
url = await uploader.upload("image.jpg")
# "https://my-bucket.s3.amazonaws.com/uploads/abc123/image.jpg"
# В конкретную папку
url = await uploader.upload(
"image.jpg",
folder="avatars",
filename="user_123.jpg"
)
Прогресс загрузки¶
async def progress_callback(uploaded: int, total: int):
percent = uploaded / total * 100
print(f"Uploaded: {percent:.1f}%")
url = await uploader.upload(
"large_video.mp4",
progress_callback=progress_callback
)
Загрузка из bytes¶
# Из bytes
image_bytes = await generate_image()
url = await uploader.upload_bytes(
data=image_bytes,
filename="generated.png",
content_type="image/png"
)
# Из base64
url = await uploader.upload_base64(
data=base64_string,
filename="image.jpg"
)
Batch загрузка¶
files = ["video1.mp4", "video2.mp4", "video3.mp4"]
# Параллельная загрузка
urls = await uploader.upload_batch(files, max_concurrent=3)
for file, url in zip(files, urls):
print(f"{file} -> {url}")
Загрузка с URL¶
# Скачать и перезалить
url = await uploader.upload_from_url(
source_url="https://example.com/image.jpg",
filename="rehosted.jpg"
)
API Reference¶
FileUploader¶
class FileUploader:
def __init__(
self,
backend: str = "transfer.sh", # "transfer.sh" или "s3"
s3_endpoint: str = None,
s3_access_key: str = None,
s3_secret_key: str = None,
s3_bucket: str = None,
s3_region: str = "us-east-1",
max_retries: int = 3,
timeout: float = 300
)
async def upload(
self,
file_path: str,
folder: str = None,
filename: str = None,
ttl_days: int = None,
progress_callback: Callable = None
) -> str
async def upload_bytes(
self,
data: bytes,
filename: str,
content_type: str = None,
folder: str = None
) -> str
async def upload_base64(
self,
data: str,
filename: str,
folder: str = None
) -> str
async def upload_batch(
self,
file_paths: List[str],
max_concurrent: int = 5
) -> List[str]
async def upload_from_url(
self,
source_url: str,
filename: str = None,
folder: str = None
) -> str
async def delete(self, url: str) -> bool # S3 only
async def close(self) -> None
Примеры из production¶
Autoshorts — загрузка готовых видео¶
class VideoUploader:
def __init__(self):
self.uploader = FileUploader(
backend="s3",
s3_endpoint=settings.s3_endpoint,
s3_access_key=settings.s3_access_key,
s3_secret_key=settings.s3_secret_key,
s3_bucket="autoshorts-videos"
)
self.ws_manager = WebSocketManager()
async def upload_result(
self,
task_id: str,
video_path: str,
user_id: str
) -> str:
"""Загрузка готового видео с прогрессом."""
async def on_progress(uploaded: int, total: int):
percent = uploaded / total * 100
await self.ws_manager.send_personal(task_id, {
"type": "upload_progress",
"percent": percent
})
url = await self.uploader.upload(
video_path,
folder=f"users/{user_id}",
filename=f"{task_id}.mp4",
progress_callback=on_progress
)
return url
Telegram Bot — загрузка медиа¶
class TelegramMediaUploader:
def __init__(self):
self.uploader = FileUploader() # transfer.sh для простоты
async def upload_voice(self, audio_bytes: bytes, user_id: str) -> str:
"""Загрузка голосового сообщения."""
return await self.uploader.upload_bytes(
data=audio_bytes,
filename=f"voice_{user_id}_{int(time.time())}.ogg",
content_type="audio/ogg"
)
async def upload_and_send(
self,
bot,
chat_id: int,
file_path: str,
caption: str = None
):
"""Загрузка и отправка файла."""
url = await self.uploader.upload(file_path, ttl_days=1)
await bot.send_document(
chat_id=chat_id,
document=url,
caption=caption
)
Music Video Generator — временные файлы¶
class TempFileManager:
def __init__(self):
self.uploader = FileUploader()
self.temp_urls = []
async def upload_temp(self, file_path: str) -> str:
"""Загрузка временного файла (24 часа)."""
url = await self.uploader.upload(file_path, ttl_days=1)
self.temp_urls.append(url)
return url
async def cleanup(self):
"""Очистка временных файлов."""
# transfer.sh автоматически удаляет по TTL
# Для S3 нужно удалять вручную
if self.uploader.backend == "s3":
for url in self.temp_urls:
await self.uploader.delete(url)
self.temp_urls = []