Skip to main content

Developer Guide

Creating Custom Workers

To create a custom worker, extend the BaseWorker class which provides all common functionality:

<?php namespace YourNamespace\Workers;

use Seiger\sTask\Workers\BaseWorker;
use Seiger\sTask\Models\sTaskModel;

class ProductWorker extends BaseWorker
{
/**
* Unique identifier for this worker
*/
public function identifier(): string
{
return 'product';
}

/**
* Module/package scope (for filtering in admin)
*/
public function scope(): string
{
return 'scommerce';
}

/**
* Icon for admin interface
*/
public function icon(): string
{
return '<i class="fa fa-cube"></i>';
}

/**
* Short human-readable title
*/
public function title(): string
{
return 'Product Management';
}

/**
* Detailed description
*/
public function description(): string
{
return 'Import and export products from/to CSV files';
}

/**
* Render widget for admin interface
*/
public function renderWidget(): string
{
return view('your-package::widgets.product-worker', [
'worker' => $this
])->render();
}

/**
* Worker settings (optional)
*/
public function settings(): array
{
return [
'batch_size' => 100,
'timeout' => 3600,
'retry_on_fail' => true,
];
}

/**
* Action: Import products from CSV
*/
public function taskImport(sTaskModel $task, array $options = []): void
{
try {
// Update task status
$task->update(['status' => 20, 'message' => 'Starting import...']);

// Get file from options
$file = $options['file'] ?? null;
if (!$file || !file_exists($file)) {
throw new \Exception('Import file not found');
}

// Read CSV
$handle = fopen($file, 'r');
$header = fgetcsv($handle);

// Count total rows
$total = 0;
while (fgets($handle)) $total++;
rewind($handle);
fgetcsv($handle); // Skip header

$processed = 0;
$startTime = microtime(true);

// Process each row
while (($row = fgetcsv($handle)) !== false) {
$data = array_combine($header, $row);

// Import product logic
$this->importProduct($data);

$processed++;

// Update progress every 10 items
if ($processed % 10 === 0 || $processed === $total) {
$progress = (int)(($processed / $total) * 100);

// Calculate ETA
$elapsed = microtime(true) - $startTime;
$rate = $processed / $elapsed;
$remaining = $total - $processed;
$etaSeconds = $remaining > 0 ? $remaining / $rate : 0;

$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'eta' => niceEta($etaSeconds),
'message' => "Imported {$processed} of {$total} products"
]);
}
}

fclose($handle);

// Mark as finished
$this->markFinished(
$task,
null,
"Successfully imported {$processed} products in " . round(microtime(true) - $startTime, 2) . "s"
);

} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}

/**
* Action: Export products to CSV
*/
public function taskExport(sTaskModel $task, array $options = []): void
{
try {
$task->update(['status' => 20, 'message' => 'Starting export...']);

// Prepare export file
$filename = 'products_' . date('Y-m-d_His') . '.csv';
$filepath = storage_path('stask/uploads/' . $filename);

if (!is_dir(dirname($filepath))) {
mkdir(dirname($filepath), 0755, true);
}

$handle = fopen($filepath, 'w');

// Write header
fputcsv($handle, ['ID', 'SKU', 'Name', 'Price', 'Stock']);

// Get products
$products = \DB::table('products')->get();
$total = count($products);
$processed = 0;

foreach ($products as $i => $product) {
fputcsv($handle, [
$product->id,
$product->sku,
$product->name,
$product->price,
$product->stock,
]);

$processed++;

if ($processed % 100 === 0 || $processed === $total) {
$progress = (int)(($processed / $total) * 100);

$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'message' => "Exported {$processed} of {$total} products"
]);
}
}

fclose($handle);

$this->markFinished(
$task,
$filepath,
"Exported {$total} products to {$filename}"
);

} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}

/**
* Action: Sync stock levels
*/
public function taskSyncStock(sTaskModel $task, array $options = []): void
{
try {
$source = $options['source'] ?? 'api';

$task->update(['status' => 20, 'message' => "Syncing stock from {$source}..."]);

// Your sync logic here
$items = $this->fetchStockFromSource($source);
$total = count($items);

foreach ($items as $i => $item) {
$this->updateProductStock($item['sku'], $item['quantity']);

if (($i + 1) % 50 === 0) {
$this->pushProgress($task, [
'progress' => (int)((($i + 1) / $total) * 100),
'processed' => $i + 1,
'total' => $total,
]);
}
}

$this->markFinished($task, null, "Synced stock for {$total} products");

} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}

// Helper methods
private function importProduct(array $data): void
{
// Your import logic
}

private function fetchStockFromSource(string $source): array
{
// Your API/source fetch logic
return [];
}

private function updateProductStock(string $sku, int $quantity): void
{
// Update logic
}
}

Worker Discovery

Workers are automatically discovered if they:

  1. Implement the TaskInterface
  2. Are not abstract classes
  3. Can be instantiated
  4. Are in your package namespace

The discovery process scans all installed Composer packages and registers workers automatically.

Worker Configuration

Custom Settings

Workers can provide custom configuration through the renderSettings() method:

public function renderSettings(): string
{
$apiKey = $this->getConfig('api_key', '');
$endpoint = $this->getConfig('endpoint', '');

return <<<HTML
<h4><i data-lucide="key" class="w-4 h-4"></i> API Configuration</h4>
<div class="form-group">
<label>API Endpoint</label>
<input type="url"
class="form-control"
name="endpoint"
value="{$endpoint}"
placeholder="https://api.example.com">
</div>
<div class="form-group">
<label>API Key</label>
<input type="text"
class="form-control"
name="api_key"
value="{$apiKey}"
placeholder="your-api-key">
</div>
<hr>
HTML;
}

Reading Configuration

Use BaseWorker methods to access settings:

// Get single value
$endpoint = $this->getConfig('endpoint', 'https://default.com');

// Get nested value (dot notation)
$timeout = $this->getConfig('api.timeout', 30);

// Get all settings
$settings = $this->settings();

Saving Configuration

Configuration is automatically saved through the admin interface. You can also programmatically update it:

// Set single value
$this->setConfig('endpoint', 'https://api.example.com');

// Update multiple values
$this->updateConfig([
'endpoint' => 'https://api.example.com',
'api_key' => 'secret-key',
'timeout' => 60,
]);

Storage: Settings are stored in s_workers.settings (JSON column).

Schedule Configuration

Workers with taskMake() method automatically get schedule configuration in admin interface.

Schedule Types:

  1. Manual - Only on-demand execution
  2. Once - Run once at specific datetime
  3. Periodic - Run at specific time with frequency (hourly/daily/weekly)
  4. Regular - Run within time period with interval (every 15/30/60 minutes)

Check if should run:

public function taskMake(sTaskModel $task, array $opt = []): void
{
// Check schedule (skip for manual runs)
$isManual = $opt['manual'] ?? true;
if (!$isManual && !$this->shouldRunNow()) {
$task->update([
'status' => sTaskModel::TASK_STATUS_FINISHED,
'message' => 'Skipped: outside schedule',
]);
return;
}

// Continue with task execution...
}

Access schedule:

$schedule = $this->getSchedule();
// Returns:
// [
// 'type' => 'regular',
// 'enabled' => true,
// 'start_time' => '05:00',
// 'end_time' => '23:00',
// 'interval' => 'hourly',
// ]

Task Management API

Creating Tasks

use Seiger\sTask\Facades\sTask;

// Basic task creation
$task = sTask::create(
identifier: 'product',
action: 'import',
data: ['file' => '/path/to/products.csv'],
priority: 'high',
userId: evo()->getLoginUserID()
);

// Create with custom priority
$task = sTask::create(
identifier: 'product',
action: 'export',
data: ['format' => 'csv', 'filters' => ['active' => true]],
priority: 'normal', // 'low', 'normal', 'high'
userId: evo()->getLoginUserID()
);

// Programmatic task creation from worker
$worker = new ProductWorker();
$task = $worker->createTask('import', ['file' => 'products.csv']);

Processing Tasks

// Process all pending tasks (default batch size: 10)
$processedCount = sTask::processPendingTasks();

// Process with custom batch size
$processedCount = sTask::processPendingTasks(batchSize: 50);

// Get task statistics
$stats = sTask::getStats();
/* Returns:
[
'pending' => 5,
'running' => 2,
'completed' => 100,
'failed' => 3,
'cancelled' => 1,
'total' => 111,
]
*/

// Get pending tasks
$pending = sTask::getPendingTasks(limit: 20);

foreach ($pending as $task) {
echo "Task #{$task->id}: {$task->identifier} -> {$task->action}\n";
}

Worker Management

// Discover new workers
$registered = sTask::discoverWorkers();
echo "Registered " . count($registered) . " new workers\n";

// Rescan existing workers (update their metadata)
$updated = sTask::rescanWorkers();
echo "Updated " . count($updated) . " workers\n";

// Clean orphaned workers (classes no longer exist)
$deleted = sTask::cleanOrphanedWorkers();
echo "Deleted {$deleted} orphaned workers\n";

// Get all workers
$workers = sTask::getWorkers(activeOnly: false);

foreach ($workers as $worker) {
echo "{$worker->identifier} ({$worker->scope}) - ";
echo $worker->active ? 'Active' : 'Inactive';
echo "\n";
}

// Get specific worker
$worker = sTask::getWorker('product');
if ($worker) {
echo "Title: {$worker->title}\n";
echo "Description: {$worker->description}\n";
echo "Icon: {$worker->icon}\n";
}

// Activate/deactivate workers
sTask::activateWorker('product');
sTask::deactivateWorker('old_worker');

// Filter workers by scope
$commerceWorkers = \Seiger\sTask\Models\sWorker::byScope('scommerce')->get();

Task Execution

// Execute specific task
$task = \Seiger\sTask\Models\sTaskModel::find(1);
$result = sTask::execute($task);

if ($result) {
echo "Task completed successfully\n";
} else {
echo "Task failed: {$task->message}\n";
}

// Retry failed task
if ($task->canRetry()) {
sTask::retry($task);
}

Cleanup Operations

// Clean tasks older than 30 days
$deletedTasks = sTask::cleanOldTasks(days: 30);
echo "Deleted {$deletedTasks} old tasks\n";

// Clean logs older than 30 days
$deletedLogs = sTask::cleanOldLogs(days: 30);
echo "Deleted {$deletedLogs} old log files\n";

// Custom cleanup
$deleted = \Seiger\sTask\Models\sTaskModel::where('status', 30) // completed
->where('finished_at', '<', now()->subDays(7))
->delete();

Action Method Naming Convention

Action methods must follow the task{Action} convention:

Action NameMethod NameExample
importtaskImport()Import products
exporttaskExport()Export products
sync_stocktaskSyncStock()Sync stock levels
generate_reporttaskGenerateReport()Generate reports
send_emailstaskSendEmails()Send bulk emails
// Action name conversion examples:
'import' → taskImport()
'export' → taskExport()
'sync' → taskSync()
'sync_stock' → taskSyncStock()
'send_emails' → taskSendEmails()
'generate_report' → taskGenerateReport()
'cleanup-old-data' → taskCleanupOldData()

Progress Tracking

Basic Progress Updates

public function taskProcess(sTaskModel $task, array $options = []): void
{
$items = range(1, 1000);
$total = count($items);

foreach ($items as $i => $item) {
// Process item
sleep(0.01); // Simulate work

// Update progress
$processed = $i + 1;
$progress = (int)(($processed / $total) * 100);

$this->pushProgress($task, [
'progress' => $progress,
'processed' => $processed,
'total' => $total,
'message' => "Processing item {$processed} of {$total}"
]);
}

$this->markFinished($task);
}

Progress with ETA Calculation

public function taskLongRunning(sTaskModel $task, array $options = []): void
{
$total = 10000;
$startTime = microtime(true);

for ($i = 0; $i < $total; $i++) {
// Process item
$this->processItem($i);

// Update every 100 items
if ($i > 0 && $i % 100 === 0) {
$elapsed = microtime(true) - $startTime;
$rate = $i / $elapsed; // items per second
$remaining = $total - $i;
$etaSeconds = $remaining / $rate;

$this->pushProgress($task, [
'progress' => (int)(($i / $total) * 100),
'processed' => $i,
'total' => $total,
'eta' => niceEta($etaSeconds),
'message' => "Processing... {$i}/{$total}"
]);
}
}

$this->markFinished($task, null, "Processed {$total} items");
}

Multi-stage Progress

public function taskMultiStage(sTaskModel $task, array $options = []): void
{
try {
// Stage 1: Preparation (0-20%)
$this->pushProgress($task, [
'progress' => 5,
'message' => 'Preparing data...'
]);

$data = $this->prepareData();

$this->pushProgress($task, [
'progress' => 20,
'message' => 'Data prepared'
]);

// Stage 2: Processing (20-80%)
$total = count($data);
foreach ($data as $i => $item) {
$this->processItem($item);

// Progress from 20% to 80%
$stageProgress = ($i + 1) / $total; // 0.0 to 1.0
$overallProgress = 20 + ($stageProgress * 60); // 20 to 80

if ($i % 10 === 0) {
$this->pushProgress($task, [
'progress' => (int)$overallProgress,
'processed' => $i + 1,
'total' => $total,
'message' => "Processing: {$i}/{$total}"
]);
}
}

// Stage 3: Finalization (80-100%)
$this->pushProgress($task, [
'progress' => 85,
'message' => 'Generating report...'
]);

$reportPath = $this->generateReport($data);

$this->pushProgress($task, [
'progress' => 95,
'message' => 'Saving results...'
]);

$this->saveResults($data);

// Done
$this->markFinished($task, $reportPath, 'All stages completed');

} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}

Logging

File-based Progress Tracking

sTask uses a file-based progress tracking system with structured logs:

Storage:

  • Location: storage/stask/{task_id}.log
  • Format: Pipe-separated values
  • Structure: status|progress|processed|total|eta|message

Example Log File:

preparing|0|0|0|—|Task preparing...
running|20|50|250|3m 15s|Processing items...
running|45|112|250|2m 10s|Processing items...
running|75|187|250|45s|Processing items...
completed|100|250|250|0s|**Task completed successfully (5.2s)**

Benefits:

  • Append-only - No file locking conflicts
  • Full history - Complete execution trace
  • Fast reads - Read last line for current status
  • Real-time - Instant updates in UI

Progress Update Methods

pushProgress() - Main method for progress updates:

$this->pushProgress($task, [
'progress' => 45, // 0-100
'processed' => 112, // Items processed
'total' => 250, // Total items
'eta' => '2m 10s', // Estimated time
'message' => 'Processing...' // Current operation
]);

Each call appends a new line to the log file with all information.

Status Text Conversion

sTask provides a centralized method to convert status codes to text representations:

Static Method:

use Seiger\sTask\Models\sTaskModel;

// Convert status code to text
$statusText = sTaskModel::statusText(sTaskModel::TASK_STATUS_RUNNING);
// Returns: 'running'

// Use in pushProgress
$this->pushProgress($task, [
'status' => sTaskModel::statusText(sTaskModel::TASK_STATUS_FINISHED),
'progress' => 100,
'message' => 'Task completed'
]);

Using Task Instance:

// Get text status from task instance
$task = sTaskModel::find($id);
$statusText = $task->status_text; // Returns 'running', 'completed', etc.

// Available status texts:
// - 'pending' (TASK_STATUS_QUEUED = 10)
// - 'preparing' (TASK_STATUS_PREPARING = 30)
// - 'running' (TASK_STATUS_RUNNING = 50)
// - 'completed' (TASK_STATUS_FINISHED = 80)
// - 'failed' (TASK_STATUS_FAILED = 100)

Benefits:

  • Type-safe - Use constants instead of hardcoded strings
  • Consistent - All status texts come from one place
  • Easy to extend - Add new statuses in one location
  • No typos - Impossible to misspell status names

Automatic Logging

sTask automatically logs:

  • Task start/completion
  • Task failures
  • Progress updates (in progress files)

Log files are stored in storage/stask/{task_id}.log

Custom Logging

use Seiger\sTask\Facades\sTask;

public function taskWithLogging(sTaskModel $task, array $options = []): void
{
// Info log
sTask::log($task, 'info', 'Starting import process', [
'file' => $options['file'],
'user_id' => $task->started_by
]);

try {
foreach ($items as $item) {
try {
$this->processItem($item);
} catch (\Exception $e) {
// Warning for non-critical errors
sTask::log($task, 'warning', "Skipped item {$item->id}", [
'reason' => $e->getMessage(),
'item' => $item->toArray()
]);
continue;
}
}

// Success info
sTask::log($task, 'info', 'Import completed successfully', [
'total_processed' => count($items)
]);

$this->markFinished($task);

} catch (\Exception $e) {
// Error log
sTask::log($task, 'error', 'Import failed', [
'exception' => get_class($e),
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine()
]);

$this->markFailed($task, $e->getMessage());
}
}

Reading Logs

$task = \Seiger\sTask\Models\sTaskModel::find(1);

// Get all logs
$logs = $task->getLogs();
foreach ($logs as $log) {
echo "[{$log['timestamp']}] {$log['level']}: {$log['message']}\n";
if (!empty($log['context'])) {
print_r($log['context']);
}
}

// Get last 10 logs
$recentLogs = $task->getLastLogs(10);

// Get error logs only
$errorLogs = $task->getErrorLogs();

// Clear task logs
$task->clearLogs();

// Download logs
return $task->logger()->downloadLogs($task);

Error Handling

Basic Error Handling

public function taskSafe(sTaskModel $task, array $options = []): void
{
try {
// Your logic
$result = $this->doSomething();

if (!$result) {
throw new \RuntimeException('Operation failed');
}

$this->markFinished($task);

} catch (\Exception $e) {
// Log detailed error
sTask::log($task, 'error', $e->getMessage(), [
'exception' => get_class($e),
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString()
]);

$this->markFailed($task, $e->getMessage());
}
}

Retry Logic

public function taskWithRetry(sTaskModel $task, array $options = []): void
{
$maxRetries = 3;
$currentAttempt = $task->attempts;

try {
// Attempt operation
$this->doUnreliableOperation();

$this->markFinished($task);

} catch (\Exception $e) {
if ($currentAttempt < $maxRetries) {
// Will retry
sTask::log($task, 'warning',
"Attempt {$currentAttempt} failed, will retry",
['error' => $e->getMessage()]
);

// sTask will automatically retry
throw $e;
} else {
// Max retries reached
sTask::log($task, 'error',
"All {$maxRetries} attempts failed",
['last_error' => $e->getMessage()]
);

$this->markFailed($task, "Failed after {$maxRetries} attempts: " . $e->getMessage());
}
}
}

Validation Before Processing

public function taskValidated(sTaskModel $task, array $options = []): void
{
// Validate options
$errors = [];

if (empty($options['file'])) {
$errors[] = 'File path is required';
} elseif (!file_exists($options['file'])) {
$errors[] = 'File does not exist: ' . $options['file'];
}

if (empty($options['user_id'])) {
$errors[] = 'User ID is required';
}

if (!empty($errors)) {
$this->markFailed($task, 'Validation failed: ' . implode('; ', $errors));
return;
}

// Process
try {
$this->processFile($options['file'], $options['user_id']);
$this->markFinished($task);
} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}

Task Priorities

// High priority - processed first
$urgentTask = sTask::create(
identifier: 'notification',
action: 'send_urgent',
data: ['email' => 'admin@example.com'],
priority: 'high'
);

// Normal priority - default
$normalTask = sTask::create(
identifier: 'report',
action: 'generate',
data: [],
priority: 'normal'
);

// Low priority - processed last
$backgroundTask = sTask::create(
identifier: 'cleanup',
action: 'archive_old_data',
data: [],
priority: 'low'
);

// Tasks are processed in priority order:
// 1. All 'high' priority tasks
// 2. All 'normal' priority tasks
// 3. All 'low' priority tasks

Task Statuses

use Seiger\sTask\Models\sTaskModel;

// Status constants
// 10 - pending
// 20 - running
// 30 - completed
// 40 - failed
// 50 - cancelled

// Check task status
$task = sTaskModel::find(1);

if ($task->isPending()) {
echo "Task is waiting to be processed\n";
}

if ($task->isRunning()) {
echo "Task is currently executing\n";
}

if ($task->isFinished()) {
echo "Task is done (completed, failed, or cancelled)\n";
}

// Get status text
echo $task->status_text; // 'pending', 'running', 'completed', 'failed', 'cancelled'

// Query by status
$pendingTasks = sTaskModel::pending()->get();
$runningTasks = sTaskModel::running()->get();
$completedTasks = sTaskModel::completed()->get();
$failedTasks = sTaskModel::failed()->get();

// Get incomplete tasks (not finished and not failed)
$incompleteTasks = sTaskModel::incomplete()->get();

// Query by identifier and action
$productImports = sTaskModel::byIdentifier('product')
->byAction('import')
->get();

// Recent failed tasks
$recentFailures = sTaskModel::failed()
->where('created_at', '>', now()->subDays(7))
->orderBy('created_at', 'desc')
->get();

Advanced Examples

Batch Processing with Chunking

public function taskBatchProcess(sTaskModel $task, array $options = []): void
{
$chunkSize = 100;
$processed = 0;

// Get total count
$total = \DB::table('products')->count();

\DB::table('products')->orderBy('id')->chunk($chunkSize, function($products) use ($task, &$processed, $total) {
foreach ($products as $product) {
$this->processProduct($product);
$processed++;
}

// Update progress after each chunk
$this->pushProgress($task, [
'progress' => (int)(($processed / $total) * 100),
'processed' => $processed,
'total' => $total,
'message' => "Processed {$processed}/{$total} products"
]);
});

$this->markFinished($task, null, "Processed {$processed} products");
}

File Download Task

public function taskDownload(sTaskModel $task, array $options = []): void
{
$url = $options['url'];
$filename = basename($url);
$destination = storage_path('downloads/' . $filename);

if (!is_dir(dirname($destination))) {
mkdir(dirname($destination), 0755, true);
}

$file = fopen($destination, 'w');

$ch = curl_init($url);
curl_setopt($ch, CURLOPT_FILE, $file);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($ch, CURLOPT_NOPROGRESS, false);

// Progress callback
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, function($resource, $downloadSize, $downloaded) use ($task) {
if ($downloadSize > 0) {
$progress = (int)(($downloaded / $downloadSize) * 100);

$this->pushProgress($task, [
'progress' => $progress,
'processed' => $downloaded,
'total' => $downloadSize,
'message' => "Downloaded " . $this->formatBytes($downloaded) . " of " . $this->formatBytes($downloadSize)
]);
}
});

curl_exec($ch);
$error = curl_error($ch);
curl_close($ch);
fclose($file);

if ($error) {
unlink($destination);
$this->markFailed($task, "Download failed: {$error}");
} else {
$this->markFinished($task, $destination, "Downloaded {$filename}");
}
}

private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB'];
for ($i = 0; $bytes > 1024 && $i < count($units) - 1; $i++) {
$bytes /= 1024;
}
return round($bytes, 2) . ' ' . $units[$i];
}

API Sync Task

public function taskApiSync(sTaskModel $task, array $options = []): void
{
$apiUrl = $options['api_url'];
$apiKey = $this->settings()['api_key'];

try {
// Fetch from API
$this->pushProgress($task, [
'progress' => 10,
'message' => 'Fetching data from API...'
]);

$response = $this->apiRequest($apiUrl, $apiKey);
$items = json_decode($response, true);

if (!is_array($items)) {
throw new \RuntimeException('Invalid API response');
}

$total = count($items);

// Process items
foreach ($items as $i => $item) {
$this->syncItem($item);

if (($i + 1) % 10 === 0) {
$progress = 10 + (int)((($i + 1) / $total) * 80); // 10% to 90%

$this->pushProgress($task, [
'progress' => $progress,
'processed' => $i + 1,
'total' => $total,
'message' => "Synced {$i+1}/{$total} items"
]);
}
}

// Final cleanup
$this->pushProgress($task, [
'progress' => 95,
'message' => 'Cleaning up...'
]);

$this->cleanup();

$this->markFinished($task, null, "Synced {$total} items from API");

} catch (\Exception $e) {
$this->markFailed($task, $e->getMessage());
}
}