Руководство разр аботчика
Создание пользовательских воркеров
Для создания пользовательского воркера, расширьте класс BaseWorker который предоставляет всю общую функциональность:
<?php namespace YourNamespace\Workers;
use Seiger\sTask\Workers\BaseWorker;
use Seiger\sTask\Models\sTaskModel;
class ProductWorker extends BaseWorker
{
/**
* Уникальный идентификатор для этого воркера
*/
public function identifier(): string
{
return 'product';
}
/**
* Scope модуля/пакета (для фильтрации в админе)
*/
public function scope(): string
{
return 'scommerce';
}
/**
* Иконка для админ интерфейса
*/
public function icon(): string
{
return '<i class="fa fa-cube"></i>';
}
/**
* Краткое понятное название
*/
public function title(): string
{
return 'Управление товарами';
}
/**
* Детальное описание
*/
public function description(): string
{
return 'Импорт и экспорт товаров из/в CSV файлы';
}
/**
* Рендер виджета для админ интерфейса
*/
public function renderWidget(): string
{
return view('your-package::widgets.product-worker', [
'worker' => $this
])->render();
}
/**
* Настройки воркера (опционально)
*/
public function settings(): array
{
return [
'batch_size' => 100,
'timeout' => 3600,
'retry_on_fail' => true,
];
}
/**
* Действие: Импорт товаров из CSV
*/
public function taskImport(sTaskModel $task, array $options = []): void
{
try {
// Обновить статус задачи
$task->update(['status' => 20, 'message' => 'Начало импорта...']);
// Получить файл из опций
$file = $options['file'] ?? null;
if (!$file || !file_exists($file)) {
throw new \Exception('Файл для импорта не найден');
}
// Прочитать CSV
$handle = fopen($file, 'r');
$header = fgetcsv($handle);
// Посчитать общее количество строк
$total = 0;
while (fgets($handle)) $total++;
rewind($handle);
fgetcsv($handle); // Пропустить заголовок
$processed = 0;
$startTime = microtime(true);
// Обработать каждую строку
while (($row = fgetcsv($handle)) !== false) {
$data = array_combine($header, $row);
// Логика импорта товара
$this->importProduct($data);
$processed++;
// Обновлять прогресс каждые 10 элементов
if ($processed % 10 === 0 || $processed === $total) {
$progress = (int)(($processed / $total) * 100);
// Рассчитать ETA
$elapsed = microtime(true) - $startTime;
$rate = $processed / $elapsed;
$remaining = $total - $processed;
$etaSeconds = $remaining > 0 ? $remaining / $rate : 0;
$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'eta' => niceEta($etaSeconds),
'message' => "Импортировано {$processed} из {$total} товаров"
]);
}
}
fclose($handle);
// Отметить как завершенное
$this->markFinished(
$task,
null,
"Успешно импортировано {$processed} товаров за " . round(microtime(true) - $startTime, 2) . "с"
);
} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}
/**
* Действие: Экспорт товаров в CSV
*/
public function taskExport(sTaskModel $task, array $options = []): void
{
try {
$task->update(['status' => 20, 'message' => 'Начало экспорта...']);
// Подготовить файл экспорта
$filename = 'products_' . date('Y-m-d_His') . '.csv';
$filepath = storage_path('stask/uploads/' . $filename);
if (!is_dir(dirname($filepath))) {
mkdir(dirname($filepath), 0755, true);
}
$handle = fopen($filepath, 'w');
// Записать заголовок
fputcsv($handle, ['ID', 'SKU', 'Название', 'Цена', 'Остаток']);
// Получить товары
$products = \DB::table('products')->get();
$total = count($products);
$processed = 0;
foreach ($products as $i => $product) {
fputcsv($handle, [
$product->id,
$product->sku,
$product->name,
$product->price,
$product->stock,
]);
$processed++;
if ($processed % 100 === 0 || $processed === $total) {
$progress = (int)(($processed / $total) * 100);
$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'message' => "Экспортировано {$processed} из {$total} товаров"
]);
}
}
fclose($handle);
$this->markFinished(
$task,
$filepath,
"Экспортировано {$total} товаров в {$filename}"
);
} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}
/**
* Действие: Синхронизация остатков
*/
public function taskSyncStock(sTaskModel $task, array $options = []): void
{
try {
$source = $options['source'] ?? 'api';
$task->update(['status' => 20, 'message' => "Синхронизация остатков с {$source}..."]);
// Ваша логика синхронизации здесь
$items = $this->fetchStockFromSource($source);
$total = count($items);
foreach ($items as $i => $item) {
$this->updateProductStock($item['sku'], $item['quantity']);
if (($i + 1) % 50 === 0) {
$this->pushProgress($task, [
'progress' => (int)((($i + 1) / $total) * 100),
'processed' => $i + 1,
'total' => $total,
]);
}
}
$this->markFinished($task, null, "Синхронизированы остатки для {$total} товаров");
} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}
// Вспомогательные методы
private function importProduct(array $data): void
{
// Ваша логика импорта
}
private function fetchStockFromSource(string $source): array
{
// Ваша логика получения из API/источника
return [];
}
private function updateProductStock(string $sku, int $quantity): void
{
// Логика обновления
}
}
Автоматическое обнаружение воркеров
Воркеры автоматически обнаруживаются если они:
- Реализуют интерфейс
TaskInterface - Не являются абстрактными классами
- Могут быть инстанцированы
- Находятся в пространстве имен вашего пакета
Процесс обнаружения сканирует все установленные Composer пакеты и автоматически регистрирует воркеры.
Конфигурация воркеров
Кастомные настройки
Воркеры могут предоставлять собственную конфигурацию через метод renderSettings():
public function renderSettings(): string
{
$apiKey = $this->getConfig('api_key', '');
$endpoint = $this->getConfig('endpoint', '');
return <<<HTML
<h4><i data-lucide="key" class="w-4 h-4"></i> Конфигурация API</h4>
<div class="form-group">
<label>API Endpoint</label>
<input type="url"
class="form-control"
name="endpoint"
value="{$endpoint}"
placeholder="https://api.example.com">
</div>
<div class="form-group">
<label>API ключ</label>
<input type="text"
class="form-control"
name="api_key"
value="{$apiKey}"
placeholder="ваш-api-ключ">
</div>
<hr>
HTML;
}
Чтение конфигурации
Используйте методы BaseWorker для доступа к настройкам:
// Получить одно значение
$endpoint = $this->getConfig('endpoint', 'https://default.com');
// Получить вложенное значение (dot notation)
$timeout = $this->getConfig('api.timeout', 30);
// Получить все настройки
$settings = $this->settings();
Сохранение конфигурации
Конфигурация автоматически сохраняется через админ интерфейс. Также можно программно обновить:
// Установить одно значение
$this->setConfig('endpoint', 'https://api.example.com');
// Обновить несколько значений
$this->updateConfig([
'endpoint' => 'https://api.example.com',
'api_key' => 'secret-key',
'timeout' => 60,
]);
Хранение: Настройки сохраняются в s_workers.settings (JSON колонка).
Конфигурация расписания
Воркеры с методом taskMake() автоматически получают конфигурацию расписания в админ интерфейсе.
Типы расписания:
- По требованию - Только ручное выполнение
- Один раз - Выполнить один раз в указанный datetime
- Периодически - Выполнять в указанное время с периодичностью (ежечасно/ежедневно/еженедельно)
- Регулярно - Выполнять в временном периоде с интервалом (каждые 15/30/60 минут)
Проверка нужно ли запускать:
public function taskMake(sTaskModel $task, array $opt = []): void
{
// Проверка расписания (пропускаем для ручных запусков)
$isManual = $opt['manual'] ?? true;
if (!$isManual && !$this->shouldRunNow()) {
$task->update([
'status' => sTaskModel::TASK_STATUS_FINISHED,
'message' => 'Пропущено: вне графика расписания',
]);
return;
}
// Продолжение выполнения задачи...
}
Доступ к расписанию:
$schedule = $this->getSchedule();
// Возвращает:
// [
// 'type' => 'regular',
// 'enabled' => true,
// 'start_time' => '05:00',
// 'end_time' => '23:00',
// 'interval' => 'hourly',
// ]
API управления задачами
Создание задач
use Seiger\sTask\Facades\sTask;
// Базовое создание задачи
$task = sTask::create(
identifier: 'product',
action: 'import',
data: ['file' => '/path/to/products.csv'],
priority: 'high',
userId: evo()->getLoginUserID()
);
// Создать с собственным приоритетом
$task = sTask::create(
identifier: 'product',
action: 'export',
data: ['format' => 'csv', 'filters' => ['active' => true]],
priority: 'normal', // 'low', 'normal', 'high'
userId: evo()->getLoginUserID()
);
// Программное создание задачи из воркера
$worker = new ProductWorker();
$task = $worker->createTask('import', ['file' => 'products.csv']);
Обработка задач
// Обработать все ожидающие задачи (размер пакета по умолчанию: 10)
$processedCount = sTask::processPendingTasks();
// Обработать с собственным размером пакета
$processedCount = sTask::processPendingTasks(batchSize: 50);
// Получить статистику задач
$stats = sTask::getStats();
/* Возвращает:
[
'pending' => 5,
'running' => 2,
'completed' => 100,
'failed' => 3,
'cancelled' => 1,
'total' => 111,
]
*/
// Получить ожидающие задачи
$pending = sTask::getPendingTasks(limit: 20);
foreach ($pending as $task) {
echo "Задача #{$task->id}: {$task->identifier} -> {$task->action}\n";
}
Управление воркерами
// Обнаружить новые воркеры
$registered = sTask::discoverWorkers();
echo "Зарегистрировано " . count($registered) . " новых воркеров\n";
// Пересканировать существующие воркеры (обновить их метаданные)
$updated = sTask::rescanWorkers();
echo "Обновлено " . count($updated) . " воркеров\n";
// Очистить orphaned воркеры (классы больше не существуют)
$deleted = sTask::cleanOrphanedWorkers();
echo "Удалено {$deleted} orphaned воркеров\n";
// Получить всех воркеров
$workers = sTask::getWorkers(activeOnly: false);
foreach ($workers as $worker) {
echo "{$worker->identifier} ({$worker->scope}) - ";
echo $worker->active ? 'Активный' : 'Неактивный';
echo "\n";
}
// Получить конкретного воркера
$worker = sTask::getWorker('product');
if ($worker) {
echo "Название: {$worker->title}\n";
echo "Описание: {$worker->description}\n";
echo "Иконка: {$worker->icon}\n";
}
// Активировать/деактивировать воркеров
sTask::activateWorker('product');
sTask::deactivateWorker('old_worker');
// Фильтровать воркеров по scope
$commerceWorkers = \Seiger\sTask\Models\sWorker::byScope('scommerce')->get();
Выполнение задач
// Выполнить конкретную задачу
$task = \Seiger\sTask\Models\sTaskModel::find(1);
$result = sTask::execute($task);
if ($result) {
echo "Задача завершена успешно\n";
} else {
echo "Задача неудачна: {$task->message}\n";
}
// Повторить неудачную задачу
if ($task->canRetry()) {
sTask::retry($task);
}
Операции очистки
// Очистить задачи старше 30 дней
$deletedTasks = sTask::cleanOldTasks(days: 30);
echo "Удалено {$deletedTasks} старых задач\n";
// Очистить логи старше 30 дней
$deletedLogs = sTask::cleanOldLogs(days: 30);
echo "Удалено {$deletedLogs} старых файлов логов\n";
// Собственная очистка
$deleted = \Seiger\sTask\Models\sTaskModel::where('status', 30) // completed
->where('finished_at', '<', now()->subDays(7))
->delete();
Конвенция именования методов действий
Методы действий должны следовать конвенции task{Action}:
| Название действия | Название метода | Пример |
|---|---|---|
import | taskImport() | Импорт товаров |
export | taskExport() | Экспорт товаров |
sync_stock | taskSyncStock() | Синхронизация остатков |
generate_report | taskGenerateReport() | Генерация отчетов |
send_emails | taskSendEmails() | Массовая рассылка |
// Примеры преобразования названий действий:
'import' → taskImport()
'export' → taskExport()
'sync' → taskSync()
'sync_stock' → taskSyncStock()
'send_emails' → taskSendEmails()
'generate_report' → taskGenerateReport()
'cleanup-old-data' → taskCleanupOldData()
Отслеживание прогресса
Базовые обновления прогресса
public function taskProcess(sTaskModel $task, array $options = []): void
{
$items = range(1, 1000);
$total = count($items);
foreach ($items as $i => $item) {
// Обработать элемент
sleep(0.01); // Симуляция работы
// Обновить прогресс
$processed = $i + 1;
$progress = (int)(($processed / $total) * 100);
$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'message' => "Обработка элемента {$processed} из {$total}"
]);
}
$this->markFinished($task);
}
Прогресс с расчетом ETA
public function taskLongRunning(sTaskModel $task, array $options = []): void
{
$total = 10000;
$startTime = microtime(true);
for ($i = 0; $i < $total; $i++) {
// Обработать элемент
$this->processItem($i);
// Обновлять каждые 100 элементов
if ($i > 0 && $i % 100 === 0) {
$elapsed = microtime(true) - $startTime;
$rate = $i / $elapsed; // элементов в секунду
$remaining = $total - $i;
$etaSeconds = $remaining / $rate;
$this->pushProgress($task, [
'progress' => (int)(($i / $total) * 100),
'processed' => $i,
'total' => $total,
'eta' => niceEta($etaSeconds),
'message' => "Обработка... {$i}/{$total}"
]);
}
}
$this->markFinished($task, null, "Обработано {$total} элементов");
}
Многоэтапный прогресс
public function taskMultiStage(sTaskModel $task, array $options = []): void
{
try {
// Этап 1: Подготовка (0-20%)
$this->pushProgress($task, [
'progress' => 5,
'message' => 'Подготовка данных...'
]);
$data = $this->prepareData();
$this->pushProgress($task, [
'progress' => 20,
'message' => 'Данные подготовлены'
]);
// Этап 2: Обработка (20-80%)
$total = count($data);
foreach ($data as $i => $item) {
$this->processItem($item);
// Прогресс от 20% до 80%
$stageProgress = ($i + 1) / $total; // 0.0 до 1.0
$overallProgress = 20 + ($stageProgress * 60); // 20 до 80
if ($i % 10 === 0) {
$this->pushProgress($task, [
'progress' => (int)$overallProgress,
'processed' => $i + 1,
'total' => $total,
'message' => "Обработка: {$i}/{$total}"
]);
}
}
// Этап 3: Завершение (80-100%)
$this->pushProgress($task, [
'progress' => 85,
'message' => 'Генерация отчета...'
]);
$reportPath = $this->generateReport($data);
$this->pushProgress($task, [
'progress' => 95,
'message' => 'Сохранение результатов...'
]);
$this->saveResults($data);
// Готово
$this->markFinished($task, $reportPath, 'Все этапы завершены');
} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}
Логирование
Файловая система отслеживания прогресса
sTask использует файловую систему отслеживания прогресса со структурированными логами:
Хранение:
- Расположение:
storage/stask/{task_id}.log - Формат: Значения разделенные вертикальной чертой
- Структура:
status|progress|processed|total|eta|message
Пример файла лога:
preparing|0|0|0|—|Подготовка задачи...
running|20|50|250|3m 15s|Обработка элементов...
running|45|112|250|2m 10s|Обработка элементов...
running|75|187|250|45s|Обработка элементов...
completed|100|250|250|0s|**Задача выполнена успешно (5.2s)**
Преимущества:
- Только добавление - Нет конфликтов блокировки файлов
- Полная история - Полная трассировка выполнения
- Быстрое чтение - Прочитать последнюю стро ку для текущего статуса
- Реальное время - Мгновенные обновления в UI
Методы обновления прогресса
pushProgress() - Главный метод для обновления прогресса:
$this->pushProgress($task, [
'progress' => 45, // 0-100
'processed' => 112, // Обработано элементов
'total' => 250, // Всего элементов
'eta' => '2m 10s', // Ожидаемое время
'message' => 'Обработка...' // Текущая операция
]);
Каждый вызов добавляет новую строку в файл лога со всей информацией.
Конвертация статусов в текст
sTask предоставляет централизованный метод для конвертации кодов статусов в текстовые представления:
Статический метод:
use Seiger\sTask\Models\sTaskModel;
// Конвертировать код статуса в текст
$statusText = sTaskModel::statusText(sTaskModel::TASK_STATUS_RUNNING);
// Возвращает: 'running'
// Использование в pushProgress
$this->pushProgress($task, [
'status' => sTaskModel::statusText(sTaskModel::TASK_STATUS_FINISHED),
'progress' => 100,
'message' => 'Задача выполнена'
]);
Использование экземпляра задачи:
// Получить текстовый статус из экземпляра задачи
$task = sTaskModel::find($id);
$statusText = $task->status_text; // Возвращает 'running', 'completed', и т.д.
// Доступные текстовые статусы:
// - 'pending' (TASK_STATUS_QUEUED = 10)
// - 'preparing' (TASK_STATUS_PREPARING = 30)
// - 'running' (TASK_STATUS_RUNNING = 50)
// - 'completed' (TASK_STATUS_FINISHED = 80)
// - 'failed' (TASK_STATUS_FAILED = 100)
Преимущества:
- Типобезопасность - Использование констант вместо жестко закодированных строк
- Согласованность - Все текстовые статусы происходят из одного места
- Легко расширять - Добавлять новые статусы в одном месте
- Без ошибок - Невозможно сделать ошибку в написании названий статусов
Автоматическое логирование
sTask автоматически логирует:
- Старт/завершение задачи
- Ошибки задачи
- Обновления прогресса (в файлах прогресса)
Файлы логов хранятся в storage/stask/{task_id}.log
Собственное логирование
use Seiger\sTask\Facades\sTask;
public function taskWithLogging(sTaskModel $task, array $options = []): void
{
// Info лог
sTask::log($task, 'info', 'Начало процесса импорта', [
'file' => $options['file'],
'user_id' => $task->started_by
]);
try {
foreach ($items as $item) {
try {
$this->processItem($item);
} catch (\Exception $e) {
// Warning для некритичных ошибок
sTask::log($task, 'warning', "Пропущен элемент {$item->id}", [
'reason' => $e->getMessage(),
'item' => $item->toArray()
]);
continue;
}
}
// Успех info
sTask::log($task, 'info', 'Импорт завершен успешно', [
'total_processed' => count($items)
]);
$this->markFinished($task);
} catch (\Exception $e) {
// Error лог
sTask::log($task, 'error', 'Импорт неудачен', [
'exception' => get_class($e),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]);
$this->markFailed($task, $e->getMessage());
}
}
Чтение логов
$task = \Seiger\sTask\Models\sTaskModel::find(1);
// Получить все логи
$logs = $task->getLogs();
foreach ($logs as $log) {
echo "[{$log['timestamp']}] {$log['level']}: {$log['message']}\n";
if (!empty($log['context'])) {
print_r($log['context']);
}
}
// Получить последние 10 логов
$recentLogs = $task->getLastLogs(10);
// Получить только ошибки
$errorLogs = $task->getErrorLogs();
// Очистить логи задачи
$task->clearLogs();
// Скачать логи
return $task->logger()->downloadLogs($task);
Обработка ошибок
Базовая обработка ошибок
public function taskSafe(sTaskModel $task, array $options = []): void
{
try {
// Ваша логика
$result = $this->doSomething();
if (!$result) {
throw new \RuntimeException('Операция неудачна');
}
$this->markFinished($task);
} catch (\Exception $e) {
// Логировать детальную ошибку
sTask::log($task, 'error', $e->getMessage(), [
'exception' => get_class($e),
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString()
]);
$this->markFailed($task, $e->getMessage());
}
}