This is a lightweight wrapper for PCNTL that adds Inter Process Communication (IPC) via Unix Sockets and a simple API for running tasks in parallel. It is designed to be easy to use and to integrate into existing projects.
- Run tasks in parallel using a fixed pool of worker processes.
- IPC via Unix Sockets for communication between the main process and worker processes.
- Simple API for defining tasks and handling results.
- Error handling and timeouts for worker processes.
- Tasks are queued and executed as soon as a worker process is available, allowing for efficient resource management.
You can install Quickfork via Composer.
# For the latest stable version
composer require tbessenreither/quickfork# For the latest version but might be unstable
composer require tbessenreither/quickfork:dev-mainThe main use case of Quickfork is to run Tasks in parallel with a fixed pool of worker processes.
This example command will create 4 tasks that will be run on a maximum of 5 worker threads. Each task will execute the workerThread method, which simulates some work and returns a string. One of the tasks will throw an exception to demonstrate error handling.
If there are more threads than tasks only one thread per task will be created, so in this example only 4 threads will be created. If you have more tasks than threads, the tasks will be queued and executed as soon as a thread is available.
use Tbessenreither\Quickfork\Objects\Task;
use Tbessenreither\Quickfork\Quickfork;
class FastparallelCommand
{
protected function execute(): int
{
$io = new SymfonyStyle($input, $output);
$io->title('Running QuickFork Command');
$quickfork = new Quickfork();
try {
$tasks = [];
for ($i = 0; $i < 4; $i++) {
$task = new Task(
callable: $this->workerThread(...),
arguments: [
'number' => $i + 1,
'index' => $i,
],
);
$tasks[] = $task;
}
$results = $quickfork->runTasksInThreads($tasks, maxConcurrent: 5);
foreach ($tasks as $task) {
$taskId = $task->getId();
$taskResult = $results[$taskId] ?? null;
echo "Task ID: {$taskId}, Result:\n";
if ($taskResult->hasError()) {
echo "Error: " . $taskResult->getError()->getMessage() . "\n";
} else {
echo "Result: " . $taskResult->getResult() . "\n";
}
echo "-----------------------------\n";
}
$io->success('QuickFork Command executed successfully.');
} catch (Throwable $e) {
$io->error('An error occurred while executing QuickFork Command: ' . $e->getMessage());
return Command::FAILURE;
}
return Command::SUCCESS;
}
private function workerThread(Task $task, ?int $index = null, ?int $number = null): string
{
//sleep(rand(1, 3));
if ($number === 3) {
throw new RuntimeException('Simulated critical error in worker thread.');
}
return "Worker thread is executing task {$number}.";
}
}This will print something like this:
This is a more niche use case, but it shows how to setup and use the IPC feature of Quickfork.
We create a single task that will run the taskHandler method. Then the following steps will happen:
- The Quickfork process will trigger the
fork_startmessage, which will be printed out by the parent process. - The
taskHandlerwill send a message with the topicready_for_taskto the parent process. - The
parent processwill receive theready_for_taskmessage and send a new message with the topicnew_taskand some random numbers to sum up. - The
taskHandlerwill receive the message, calculate the sum and then- send a message with the topic
thread_resultback to the parent process. - The
taskHandlerwill then send another message with the topicready_for_taskto indicate that it is ready for another task.
- send a message with the topic
- The
parent processwill receive the result and print it out. - The steps 3-5 will repeat until the parent process has sent out 5 tasks, then it will send a message with the topic
shutdownto the worker thread. - The
taskHandlerwill receive theshutdownmessage and exit the loop.- This will cause Quickfork to trigger the
fork_output,fork_resultandfork_completemessages, which will be printed out by the parent process.
- This will cause Quickfork to trigger the
sequenceDiagram
Parent-->>+Fork: creates
Fork-->>Parent: fork_start (null)
loop 5 times
Fork->>+Parent: ready_for_task (null)
Parent->>+Fork: new_task (sum: [a, b])
Fork->>-Parent: thread_result (sum)
end
Parent-->>Fork: shutdown
Fork-->>Parent: fork_output (ob contents)
Fork-->>Parent: fork_result (return value)
Fork-->>-Parent: fork_complete (null)
class QuickforkCommand extends Command
{
private int $numberOfTasks = 5;
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$io->title('Running QuickFork Command');
$quickfork = new Quickfork();
$task = new Task(
callable: $this->taskHandler(...),
arguments: [],
);
$quickfork->runTask($task);
while ($task->isRunning()) {
$messages = $task->getSocket()->getMessages();
$this->handleParentMessages($messages, $task->getSocket());
usleep(100 * 1000); // Sleep for 100ms to prevent busy waiting
}
$finalMessages = $task->getSocket()->getMessages();
$this->handleParentMessages($finalMessages, $task->getSocket());
return Command::SUCCESS;
}
/**
* @param Message[] $messages
*/
private function handleParentMessages(array $messages, Socket $socket): void
{
foreach ($messages as $message) {
echo "Received message: Topic: {$message->getTopic()}\n";
if ($message->getTopic() === 'fork_start') {
echo "Task started with ID: {$message->getForkId()}\n";
} elseif ($message->getTopic() === 'fork_complete') {
echo "Task with ID {$message->getForkId()} completed.\n";
} elseif ($message->getTopic() === 'fork_output') {
echo "Output: {$message->getContent()}\n";
} elseif ($message->getTopic() === 'fork_result') {
echo "Result:\n";
var_dump($message->getContent());
echo "-----------------------------\n";
} elseif ($message->getTopic() === 'fork_error') {
echo "Error:\n";
var_dump($message->getContent());
echo "-----------------------------\n";
} elseif ($message->getTopic() === 'ready_for_task') {
echo "Worker thread is ready for task. Sending out new one.\n";
if ($this->numberOfTasks > 0) {
$socket->send(new Message(
topic: 'new_task',
content: [
'sum' => [rand(1, 100), rand(1, 100)],
]
));
$this->numberOfTasks--;
if ($this->numberOfTasks <= 0) {
$socket->send(new Message(
topic: 'shutdown',
));
}
}
} elseif ($message->getTopic() === 'thread_result') {
echo "Thread Result:\n";
var_dump($message->getContent());
echo "-----------------------------\n";
}
}
}
private function taskHandler(Task $task): string
{
$task->getSocket()->send(new Message(
topic: 'ready_for_task',
));
$active = true;
while ($active) {
$messages = $task->getSocket()->getMessages();
foreach ($messages as $message) {
if ($message->getTopic() === 'new_task') {
$content = $message->getContent();
$sum = array_sum($content['sum']);
echo "doing more stupid calculations for the boss...\n";
$task->getSocket()->send(new Message(
topic: 'thread_result',
content: "The sum of {$content['sum'][0]} and {$content['sum'][1]} is {$sum}.",
replyTo: $message->getId(),
));
$task->getSocket()->send(new Message(
topic: 'ready_for_task',
));
} elseif ($message->getTopic() === 'shutdown') {
$active = false;
return "Task is shutting down.";
}
}
usleep(100 * 1000); // Sleep for 100ms to prevent busy waiting
}
// Simulate some work
sleep(rand(1, 3));
return "Task with ID {$task->getId()} completed.";
}
}This will print something like this:

