The Cogent IPC layer is a generalization of QNX 4's send/receive/reply IPC layer. Cogent IPC has many benefits that allow users to easily code what would be complex systems in C. Some of these services are:
Network-wide name registration service
Cascade DataHub exceptions and echos
true asynchronous messages
pseudo asynchronous messages
synchronous messages
QNX 4 IPC messages
Task started notification
Task death notification
automatic handling of receive/reply for Cogent IPC messages
remote procedure calls
To use the Cogent IPC layer, two services optionally provided to the Gamma developer are required: nserve and qserve. These services are run as programs on the same CPU or network as Gamma.
The nserve command is the Cascade NameServer module. Although similar to the QNX 4 nameloc program in concept, this name database has some differences that make it worth using.
The nserve module is run on every node requiring name services. Every nserve module is updated on an event-basis, rather than on a timed basis as QNX 4's nameloc is, and therefore discrepancies between multiple nserve's on a network are rare.
The qserve program is the asynchronous queue manager for Cogent IPC;. Queues are used in Cogent IPC to implement asynchronous communication channels between two programs. The qserve module is run on every node requiring Cogent queue services.
The Cogent IPC layer provides many advanced services that augment the basic send/receive/reply protocol. This section describes those services.
The Cogent IPC layer provides a messaging protocol that is easier to use and different in format from raw QNX 4 send/receive/reply.
Messages between Cogent IPC-enabled tasks are very similar to function calls. A message is constructed and sent, and the task on the other end evaluates the message. The return value of the evaluation of the message is transmitted to the originating task in the reply.
Consider two Gamma modules using the following code:
Task A:
#!/usr/cogent/bin/gamma init_ipc("task_a"); while (t) { next_event(); }
The function init_ipc is called first to initialize Cogent interprocess communication. For more details, see IPC Initialization below.
Task B:
#!/usr/cogent/bin/gamma init_ipc("task_b"); function ask_taska_date () { local result,tp; if (tp = locate_task("task_a",nil)) result = send(tp,#date()); else result = "could not locate task A"; } every(1.0,#princ(ask_taska_date(),"\n")); while (t) { next_event(); }
Of specific note in this example is the format of the message in the send function. The first argument to the Cogent IPC function send is a task. The locate_task function, along with the nserve module provides the name lookup. The second argument is an expression for the receiver to evaluate. For simple send's an unevaluated Gamma expression (using #) will suffice. For more complex send's, such as when a partially evaluated list of arguments need to be passed, the format of the send command should be Lisp.
This code gives a good example of using the Cogent IPC layer as an RPC (Remote Procedure Call) mechanism.
To use the Cogent IPC layer for transferring data between tasks, use the Lisp expression for assignment: setq. An example is:
Task C:
#!/usr/cogent/bin/gamma init_ipc("task_c"); add_set_function(#x,#princ("Task C reports x=",x,"\n")); while (t) { next_event(); }
Task D:
#!/usr/cogent/bin/gamma init_ipc("task_d"); function inc_x () { local result,tp; x++; if (tp = locate_task("task_c",nil)) result = send(tp,list(#setq, #x, x)); } x = 0; every(0.1,#inc_x()); while (t) { next_event(); }
In this example task C sets up a set_function before starting its event loop. The set function will print out the value of x if it changes. Task D initializes x to 0 and then starts a timer to run every tenth of a second to increment x and send setq expressions to task C.
(setq x 1)
(setq x 2)
(setq x 3)
(setq x 4)
These expressions are in Lisp format because all messages between processes use the Lisp internal representation for efficiency.
The setq function is evaluated in task C. Any side effects of the function, for example the setting of the variable x, happens in task C. The return value of the function is the content of the reply message. The return value of the send function can be found by evaluating the 'result' variable in the inc_x function.
Consider the inc_x function re-written as:
function inc_x () { local result,tp; x++; if (tp = locate_task("task_c",nil)) { result = send(tp,list(#setq, #x, x)); princ("task D result of send: ",result,"\n"); } }
When this example is run the return value of the send is shown to be the result of the setq function. Obviously, task D must wait for task C to receive and evaluate the message before sending back the response.
Consider two tasks that wish to communicate: task E and task F. Task E is a time sensitive task that needs to deliver a package of data to task F. Task E cannot take the chance that task F will accept its data immediately and issue a reply so that it may continue with its own jobs. In short, a synchronous send compromises task E's job because it must wait for task F to respond before proceeding.
To send data asynchronously from task E to task F, a queue is used. Data is sent from task E to the queue. The queue responds immediately to task E, freeing it up to continue. Then a proxy, a special non-blocking message, is sent from the queue to task F. Upon receipt of the proxy, task F knows that the queue contains data for it. When task F is ready it asks the queue for the data.
With some small changes, the example from the previous section can be changed from synchronous messaging to asynchronous, as follows:
Task E:
#!/usr/cogent/bin/gamma init_ipc("task_e","task_e_q"); add_set_function(#x,#princ("Task E reports x=",x,"\n")); while (t) { next_event(); }
Task F:
#!/usr/cogent/bin/gamma init_ipc("task_f","task_f_q"); function inc_x () { local result,tp; x++; if (tp = locate_task("task_e",nil)) { result = send_async(tp,list(#setq, #x, x)); princ("task F result of send: ",result,"\n"); } } x = 0; every(0.1,#inc_x()); while (t) { next_event(); }
The init_ipc function calls at the beginning of each module now open a queue name with qserve, and the inc_x function has been changed to use send_async instead of send.
When this example is run the results show that task F receives a t (true) that the message was delivered but does not have to wait for task E to generate the result of the expression.
Using asynchronous communication immediately solves the dead-lock problem that all developers of multi-module systems must eventually face. To the developer, the use of asynchronous communication in Gamma entails only the use of a slightly different function: send_async instead of send.
For situations where the qserve program is not running and an asynchronous non-blocking IPC call is required then Gamma pseudo-asynchronous IPC call can be used.
The isend function sends a message between two Cogent IPC enabled tasks. Immediately upon receipt of the message, the receiver replies that the message was received. The return value of the received message is not sent back.
When a task registers a name with nserve it can thereafter receive information regarding any other nserve registered task that starts or stops.
This is done by defining two functions with specific names, each within their respective code, to handle this information. The functions are:
function taskstarted_hook (name, queue, domain, node, id);
and
function taskdied_hook (name, queue, domain, node, id);
The body of each of these functions is up to the programmer. Most "hook" functions check the name, queue, and possibly the domain of the started/stopped task and then take a specific action such as:
restarting a task that has died;
informing the user that a module has died;
inform other modules that a new service is available;
query the new module for information; and,
Cascade DataHub start/stop.
The following Gamma functions automatically handle QNX 4 receive/reply:
PtMainLoop
next_event
next_event_nb
flush_events
Before any form of Cogent interprocess communication occurs there must be a call to the init_ipc function. This function opens the channels of communications between Gamma and other tasks powered by Gamma, Cascade Connect, or other Cogent products. With this function you determine your task's name and optionally its queue name and domain.
A program's name is the string registered with the nserve program. Gamma names and queue names for tasks should be unique on the network. A program's queue name is the name of the queue that is registered if it wants to participate in asynchronous communication using Cogent's qserve utilities. The domain name is the name of the default Cascade DataHub domain from which to read and write points.
It is typical to find the init_ipc function called within the first few calls in the program. Here's an example:
#!/usr/local/bin/gamma require_lisp("PhotonWidget"); require_lisp("PhabTemplate"); myname = car(argv); init_ipc("myname");
This program segment first defines the engine to run on the first line, then loads some required files for Photon widget manipulation and Photon Application Builder support. The argv variable holds the arguments passed to Gamma. The first item in the list is the name of the executable, which is put in the myname variable. The init_ipc function is then called with the registered name being whatever the name of the program happens to be.
Using Gamma's IPC communications protocol, a task can be located by name or by id. This protocol allows for synchronous, asynchronous, and semi-asynchronous communications between Gamma, SCADALisp, and other Cogent products such as Cascade Connect and the Cascade DataHub.
Locating a task by name can be done with the locate_task function. This is similar to using the qnx_name_locate function except that, since nserve's names are intended to be unique on a network the node number need not be specified.
marko:/home/marko$ gamma -q Gamma> init_ipc("locate_test"); t Gamma> tp = locate_task("cadsim",nil); #< Task:13424 >
The return value of the locate_task function is a Gamma task type. The task type is an internal representation of the task that was located. There is nothing the user can do with variables of this data type other than to pass them through as arguments to Cogent IPC functions.
To locate a task on a specific node with a specific PID number use the locate_task_id function.
Before using either locate_task or locate_task_id, the init_ipc function must have already been called.
Once discussions with a task are completed, the channel should be closed using the close_task function.
The send_string and send_string_async functions are used to format a message to be sent to a non-Cogent IPC task. These functions will accept a string (text surrounded by quotes) as a parameter, and will send the contents of the string without the enclosing quotes. Note that the normal send function will send the enclosing quotes as part of the message.
The Cascade DataHub is a high performance data collection and distribution center designed for easy integration with a Gamma application. Just as QNX 4 is an excellent choice for developers of systems that must acquire real-time data, the Cascade DataHub is the right choice for distribution of that data.
The Cascade DataHub provides:
data services to its clients by exception and lookup;
asynchronous data delivery ensuring client task protection blocking;
network connection/reconnection issues;
data services to many clients at once;
transparent data services to/from Gamma;
flexible data tag names;
inherent understanding of data types (as Gamma does);
time-stamping of data;
C libraries for the creation of custom clients;
security access levels on data points; and,
a confidence value for assigning fuzzy values to data points.
The Cascade DataHub is:
a convenient way to disseminate real-time data;
a RAM resident module holding current data;
a proven solution with thousands of hours of installed performance; and,
a great source of information for:
historical & relational database;
hard disk loggers; and,
Cascade Connect real-time connection to MS-Windows.
The Cascade DataHub is not:
a historical database;
a relational database;
a hard disk logger;
slow;
a large memory requirement module; or,
pre-configured.
Whenever multiple tasks are communicating there is a chance for a deadlock situation. The Cascade DataHub is at the center of many mission critical applications because it provides real- time data to its clients without the threat of being blocked on the receiving task. The Cascade DataHub never blocks on a task that is busy. The datahub is always able to receive data from clients because it uses the qserve manager to handle outgoing messages. The DataHub only ever sends messages to the Cascade QueueServer program, which is optimized to never enter a state where it cannot accept a message from the Cascade DataHub.
When a new data point is sent to the Cascade DataHub the datahub automatically updates its clients that are interested in the point. Some clients get information from the datahub on request only, by polling. Other clients register with the Cascade DataHub for changes in some or all points, called exceptions.
The Cascade DataHub not only allows its clients to register and receive exceptions on data points, but also provides a special message type called an echo that is extremely important in multi-node or multi-task applications.
When the Cascade DataHub receives a new data point it immediately informs its registered clients of the new data value. The clients will receive an asynchronous exception message. In some circumstances, the client that sent the new data value to the datahub is also registered for an exception on that point. In this case, the originator of the data change will also receive an exception indicating the data change. When there are multiple clients reading and writing the same data point a client may wish to perform an action whenever another client changes the data. Thus, it must be able to differentiate between exceptions which it has originated itself, and ones which originate from other clients. The Cascade DataHub defines an echo as an exception being returned to the originator of the value change.
In certain circumstances, the lack of differentiation between exceptions and echos can introduce instability into both single and multi-client systems. For example, consider an application that communicates with another Lisp or MMI system, such as Wonderware's InTouch. InTouch communicates via DDE, which does not make the distinction between exceptions and echos. A data value delivered to InTouch will always be re-emitted to the sender, which will cause the application to re-emit the value to the Cascade DataHub. The Cascade DataHub will generate an exception back to the application, which will pass this to InTouch, which will re-emit the value to the application, which will send it to the datahub, on so on. A single value change will cause an infinite communication loop. There are many other instances of this kind of behavior in asynchronous systems. By introducing echo capability into the Cascade DataHub, the cycle is broken immediately because the application can recognize that it should not re-emit a data change that it originated itself.
The echo facility is necessary for another reason. It is not sufficient to simply not emit the echo to the originating task. If two tasks read and write a single data point to the datahub, then the datahub and both tasks must still agree on the most recent value. When both tasks attempt to write the point, one gets an exception and updates its current value to agree with the datahub and the sender. If both tasks simultaneously emit different values, then the task whose message is processed first will get an exception from the first, and the first will get an exception from the second. In effect, the two tasks will swap values, and only one will agree with the datahub. The echo message solves this dilemma by allowing the task whose message was processed second to receive its own echo, causing it to realize that it had overwritten the exception from the other task.
Copyright © 1995-2010 by Cogent Real-Time Systems, Inc. All rights reserved.