Asynchronous processing
1Introduction
Temma offers Asynk, a simple way of performing asynchronous processing. This system is based on the use of the dependency injection component, extending it so that calls to objects managed by the component are processed asynchronously.
Example:
// synchronous call via loader
$this->_loader->MyObject->myMethod($param1, $param2);
// identical asynchronous call
$this->_loader->asynk->MyObject->myMethod($param1, $param2);
As with the dependency injection component, it is possible to use array notation to execute an object placed in a namespace:
// asynchronous call
$this->_loader->asynk['\My\Name\Space\MyObject']->myMethod($param1, $param2);
Tasks can be executed in several different ways:
- by Temma workers (programs running in the background)
- by a Temma script run by crontab every minute
- by a Temma script run by xinetd each time a task is launched
For worker or crontab execution, tasks can be retrieved from a message queue (Beanstalkd or AWS SQS) or a database (MySQL or Redis). For xinetd execution, only database storage (MySQL or Redis) is possible.
If a message queue is used, task data can be stored in one of two ways: either directly in the queue messages, or in a separate database (MySQL or Redis). Using a database can be useful if tasks require more data than the queue can store (64 KB for Beanstalkd, 256 KB for AWS SQS).
Beanstalkd is the recommended message queue for running tasks with Asynk.
You can also use an Amazon SQS queue, but this requires workers to log in regularly to see
if there are any jobs waiting to be executed, and there is a charge for each request made.
The simplest solution is to store tasks in a database, and execute them via crontab. For fast execution of tasks, as close as possible to real time, we recommend adding execution by xinetd server.
2Configuration
2.1Principle
Configuration is based on an extended x-asynk configuration in the etc/temma.php file, containing two parameters:
-
transport: the name of the data source used to transmit tasks.
Behavior will differ depending on the type of data source: -
storage: the name of the data source that will store messages until they are processed.
Here again, the behavior depends on the type of source:
Depending on the value, either parameter may be optional.
Here are the possible combinations:
transport | storage | Processing |
---|---|---|
MySQL | Processing with crontab or workers | |
Redis | ||
socket | MySQL | Processed by xinetd (+ optional crontab) |
socket | Redis | |
Beanstalk | Beanstalk | Processing via workers, on tasks stored in the message queue |
SQS | SQS | |
Beanstalk | MySQL | Processing via workers, on tasks stored in a database (useful if data is too large to be stored in the queue) |
Beanstalk | Redis | |
SQS | MySQL | |
SQS | Redis |
Asynk writes log messages using the Temma/Asynk log class.
Here's an example configuration file:
<?php
return [
'application' => [
// data sources
'dataSources' => [
// MySQL database
'db' => 'mysql://user:pwd@host/database',
// Amazon SQS message queue
'sqs' => 'sqs://AKXYZ:PWD@FILE',
]
],
// log message thresholds
'loglevels' => [
'Temma/Base' => 'ERR',
'Temma/Web' => 'WARN',
'Temma/Asynk' => 'NOTE',
],
// Asynk configuration
'x-asynk' => [
// transport: SQS queue
'transport' => 'sqs',
// storage: MySQL database
'storage' => 'db'
],
];
2.2MySQL storage
If you choose to store your tasks in a MySQL database, you'll need to create the table containing the tasks.
Here's the query for creating this table:
CREATE TABLE Task (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
dateCreation DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
dateUpdate DATETIME ON UPDATE CURRENT_TIMESTAMP,
status ENUM('waiting', 'reserved', 'processing', 'error') NOT NULL DEFAULT 'waiting',
token CHAR(16) CHARACTER SET ascii COLLATE ascii_general_ci,
target TINYTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
action TINYTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
data MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
PRIMARY KEY (id),
INDEX status (status),
INDEX token (token)
);
The primary key (id field) can be of type BIGINT if you need to manage more than 4 billion tasks.
If the table is stored in a different database from that of the connection, or if it is named something other than Task, or the fields have other names, this can be specified in the x-asynk extended configuration in etc/temma.php.
<?php
return [
'x-asynk' => [
'base' => 'asynk_db',
'table' => 'asynk_tasks',
'id' => 'task_id',
'status' => 'task_status',
'token' => 'task_token',
'target' => 'task_target',
'action' => 'task_token',
'data' => 'task_data',
]
];
- Line 5: Name of the database containing the table.
- Line 6: Name of the table.
- Line 7: Name of the field containing the primary key.
- Line 8: Name of the field containing the status.
- Line 9: Name of field containing reservation token.
- Line 10: Field name containing target object.
- Line 11: Field name containing the target method.
- Line 12: Field name containing serialized parameters.
It is also possible to use a customized DAO:
<?php
return [
'x-asynk' => [
'dao' => '\MyApp\AsynkDao'
]
];
3Crontab processing
3.1Introduction to crontab
Crontab processing is very simple to implement. The crontab daemon is installed or installable on all Unix systems, and is independent of other software such as a message queue.
Crontab processing runs every minute. This can cause a slight delay in processing tasks. If you need latency-free processing, you can add xinetd processing (see below).
3.2Temma configuration for crontab
The storage used to save tasks must be declared in the extended x-asynk configuration.
Example file etc/temma.php, with Redis storage:
<?php
return [
'application' => [
'dataSources' => [
'ndb' => 'redis://localhost'
]
],
'x-asynk' => [
'storage' => 'ndb'
],
];
3.3Crontab configuration
There are two ways of configuring the crontab:
-
Modify the contents of the file etc/asynk/crontab to adapt the path to the root of your project.
Then copy this file to /etc/cron.d/ (under a suitable name), for example with the command:
sudo cp /path/to/project/etc/asynk/crontab /etc/cron.d/my_project
-
Or add the following line (adapting the path) to the crontab of the desired user:
* * * * * cd /path/to/project/; bin/comma AsynkWorker crontab
4Xinetd processing
4.1Introducing xinetd
Xinetd is a “super-demon” that listens on several network ports. Each time it receives an incoming connection,
it launches the associated program, and takes charge of network exchanges.
For Asynk management, xinetd listens on port 11137 by default.
When Asynk uses xinetd, asynchronous tasks are processed immediately. However, there are two things to bear in mind:
- If you need to handle a very large number of simultaneous tasks, xinetd will show its limit. In this case, we recommend using a message queue (Beanstalkd or SQS).
- It may happen that xinetd is unable to handle certain tasks (the daemon is not running or is saturated). It is therefore advisable to combine xinetd processing with crontab processing (see above), so that tasks not processed by xinetd are processed by the crontab.
4.2Temma configuration for xinetd
In the configuration, a socket data source is required, which Asynk will use to connect to xinetd. The xinetd server can be on the local server or on a remote machine. By default, the connection port used by Asynk is 11137.
Example file etc/temma.php, with MySQL storage:
<?php
return [
'application' => [
'dataSources' => [
'sock' => 'tcp://localhost:11137',
'db' => 'mysql://user:password@localhost'
]
],
'x-asynk' => [
'transport' => 'sock',
'storage' => 'db'
],
];
4.3Xinetd configuration
To configure xinetd, edit the file etc/asynk/xinetd to adapt the path,
then copy it to the /etc/xinetd.d/ directory (under a suitable name), for example with the command:
sudo cp /path/to/project/etc/asynk/xinetd /etc/xinetd.d/my_project
4.4(optional) Crontab configuration
Using the crontab in addition to xinetd ensures that all tasks are processed, even if xinetd isn't running or is saturated.
There are two ways of configuring the crontab:
-
Modify the contents of the file etc/asynk/crontab to adapt the path to the root of your project.
Then copy this file to /etc/cron.d/ (under a suitable name), for example with the command:
sudo cp /path/to/project/etc/asynk/crontab /etc/cron.d/my_project
-
Or add the following line (adapting the path) to the crontab of the desired user:
* * * * * cd /path/to/project/; bin/comma AsynkWorker crontab
5Processing by worker
5.1Worker presentation
Workers are programs that run in the background. You can run as many workers as you like. If only one worker is running, it can be considered a processing daemon.
Workers connect to the data source (message queue or database) to retrieve the tasks to be performed. They retrieve tasks one by one, and process them sequentially. If you have a large number of tasks to process, it's best to have several workers running in parallel, otherwise tasks may pile up faster than they can be processed.
It's important to ensure that a minimum number of workers are running at all times, to avoid the risk of tasks not being processed. It is possible to use a supervisor such as Supervisord, which will automatically restart workers if the minimum number of instances is not ensured.
5.2Temma configuration for workers
Temma configuration must contain information on the storage and, where applicable, the transport of tasks.
Example file etc/temma.php, with Beanstalkd transport and MySQL storage:
<?php
return [
'application' => [
'dataSources' => [
'beanstalk' => 'tcp://localhost:11137',
'db' => 'mysql://user:password@localhost'
]
],
'x-asynk' => [
'transport' => 'beanstalk',
'storage' => 'db'
],
];
Another example of a etc/temma.php file, with SQS transport and storage:
<?php
return [
'application' => [
'dataSources' => [
'sqs' => 'sqs://AKXYZ:PWD@sqs.eu-west-3.amazonaws.com/123456789012/queue_name'
]
],
'x-asynk' => [
'transport' => 'sqs',
'storage' => 'sqs'
],
];
By default, polling workers wait 60 seconds between two connections to retrieve pending tasks. This applies to workers connecting to an Amazon SQS queue or to a MySQL or Redis database; it does not apply to Beanstalkd queues.
This delay can be modified using the loopDelay parameter in the x-asynk extended configuration:
<?php
return [
'application' => [
'dataSources' => [
'db' => 'mysql://user:password@localhost'
]
],
'x-asynk' => [
'storage' => 'db',
// 90-second delay between two checks
'loopDelay' => 90
],
];
5.3Supervisor configuration
The use of Supervisor is optional, but it can be useful to ensure that workers are running, and that they are restarted if a problem occurs.
Copy the file etc/asynk/supervisor.conf to /etc/supervisor/conf.d/asynk.conf with the following command:
sudo cp /path/to/project/etc/asynk/supervisor.conf /etc/supervisor/conf.d/asynk.conf
Then modify the following parameters in the file:
-
command: Enter the correct path to the root of your project.
Example: command=/path/to/project/bin/comma AsynkWorker -
numprocs: Set the number of workers to be executed at the same time.
Example: numprocs=5
Then force Supervisor to take this configuration into account:
sudo supervisorctl reread
sudo supervisorctl update