5.7. Cascade DataHub API Code Examples

These code examples are provided in source form as part of the Cascade DataHub API, normally installed in the /usr/cogent/src/datahub directory. They are provided here so that you can see what is involved in communicating with the DataHub using your own C programs. The Gamma programming language also contains many 'hooks' into the DataHub. For more information about using Gamma with the DataHub see the DataHub demo program that is available for download from the Cogent web site. The Cascade DataHub demo uses Gamma to show the features of the DataHub. If you download and run the Gamma demo for the DataHub you will have to reinstall your commercial version of the DataHub, because the version that comes with the Gamma demo contains a time limited license.

5.7.1. Reading from the Cascade DataHub

/*
 * Cascade DataHub point reader:  readpt
 *
 * (C) Copyright Cogent Real-Time Systems Inc., 1997.  All rights reserved.
 *
 * This program reads a point from the Cascade DataHub and displays the
 * result on the standard output.
 *
 * This program is supplied with the Cascade DataHub programming API.  It
 * may be copied or modified, in whole or in part, for the sole purpose of
 * creating applications to be used with the Cascade DataHub.
 */

#include	<stdio.h>
#include	<stdlib.h>
#include	<unistd.h>
#include	<string.h>
#include	<time.h>

#include	<cogent.h>

/*
 * A point is one of three types.  We discover which one we have and
 * print the appropriate name, value, confidence factor, lock status
 * and security level.
 */

void print_point (PT_pCPOINT ppoint, int brief)
{
	char		*t;

	printf ("Point: %s\n", ppoint->name);

	switch (ppoint->type)
	{
		case PT_TYPE_INT32:
			printf ("Value: %ld\n", (long)ppoint->value.i);
			break;
		case PT_TYPE_REAL:
			printf ("Value: %g\n", ppoint->value.r);
			break;
		case PT_TYPE_STRING:
			printf ("Value: %s\n", ppoint->value.s);
			break;
		default:
			printf ("Value: ...unknown point type!\n");
			break;
	}
	if(!brief)
	{
		t = ctime ((time_t*)&ppoint->seconds);
		t[19] = '\0';
		printf ("Time:  %s.%03ld\n", &t[4],
				(long)ppoint->nanoseconds / 1000000);
		printf ("Conf:  %d\n", ppoint->conf);
		printf ("Lock:  %d\n", ppoint->locked);
		printf ("Secur: %d\n", ppoint->security);
	}
}

#ifdef __USAGE
Copyright (C) Cogent Real-Time Systems Inc., 1996-1997

%C [-b] [-d domain] pointname

   -b    brief output (only the point name and value)
   -d    domain, replaces the default

Read pointname from the Cascade datahub, the domain
can be specified with -d, or by qualifying the pointname,
as in "domain:thepointname" (the latter technique overriding
the former).
#endif

const char* UT_USAGE =
	"Usage: %s [-b] [-d domain] pointname\n"
	;

const char* UT_HELP =
	"\n"
	"Help:\n"
	"    -b          Brief output (only the point name and value).\n"
	"    -d          Domain, replaces the default.\n"
	"\n"
	"Read pointname from the Cascade datahub, the domain\n"
	"can be specified with -d, or by qualifying the pointname,\n"
	"as in \"domain:thepointname\" (the latter technique overriding\n"
	"the former).\n"
	;

int main (int argc, char** argv)
{
	IP_Msg			*hmsg;
	ST_STATUS		status;
	PT_stCPOINT		point;
	char			*ptname = NULL, *domain = NULL, *myname;
	int				opt;
	int				brief = 0;
	IP_Task			*htask;

	/*
	 * Parse the input arguments.  The only interesting argument is an
	 * alternate datahub domain name.  We really do not need this, as
	 * we could specify the point name as domain:name
	 */
	while ((opt = getopt(argc, argv, "hbd:")) != -1)
	{
		switch (opt)
		{
		  case 'h':
		  	UT_Help(argv[0], UT_USAGE, UT_HELP);
			exit(0);

		  case 'd':
			domain = optarg;
			if (strlen(domain) > 15)
				domain[15] = '\0';
			break;

		  case 'b':
		  	brief = 1;
		  	break;

		  default:
			UT_Usage (argv[0], UT_USAGE, stderr);
			exit(1);
		}
	}

	if (!argv[optind])
	{
		UT_Usage(argv[0], UT_USAGE, stderr);
		exit(1);
	}

	/*
	 * Initialize communication through the Cascade IPC library.  We do
	 * not want other tasks to be notified of the start and stop of
	 * this task, so we do not use IP_NserveInit.  The name server
	 * will never know about this task, so notifications will not be
	 * passed on.  Tasks that attempt to look up their clients in the
	 * name server will treat this task as non-existent.  For example,
	 * Cascade DataHub would not be able to send point exceptions to
	 * this task.
	 */

	if ((myname = strrchr(argv[0], '/')))
		myname++;
	else
		myname = argv[0];
	
	if (!(htask = IP_TaskCreateMe (IP_GetChannelID(), myname, domain,
								   NULL, 0)))
	{
		fprintf (stderr, "Could not initialize Cascade IPC subsystem\n");
		exit (1);
	}

	/*
	 * Create a pre-allocated message structure for use with all
	 * IPC calls.  This includes the DH_* functions.  The API could have
	 * created its own internal message structure, but this would have
	 * left us with no way to control its size or be efficient about
	 * allocation.  This way we do a little more work, but have more
	 * control of what is being allocated.
	 */
	hmsg = IP_MsgCreate (NULL, IP_MsgDefaultSize(), 0);

	while((ptname = argv[optind++]))
	{
		/*
		 * Zero the point structure.  If we do not do this, the address
		 * field could be non-zero, and then the API will take that to be a
		 * cached datahub address.  That might cause a crash.
		 */
		memset (&point, 0, sizeof(point));

		/*
		 * Provide a point name buffer separately from the rest of the point
		 * structure.  There is no way for the API to know what the allocation
		 * status of a point name is, so it will never attempt to free this
		 * buffer, nor write into it.
		 */
		point.name = ptname;

		if ((status = DH_ReadPoint (htask, &point, hmsg, NULL)) != ST_OK)
		{
			fprintf (stderr, "Read \"%s\" failed: %s\n",
				point.name, ST_StatusName (status));
			exit(1);
		}
		else
		{
			print_point (&point, brief);
		}
	}

	return 0;
}

5.7.2. Writing data to the Cascade DataHub

/*
 * Cascade DataHub point writer:  writept
 *
 * (C) Copyright Cogent Real-Time Systems Inc., 1997.  All rights reserved.
 *
 * This program writes a point to the Cascade DataHub.
 *
 * This program is supplied with the Cascade DataHub programming API.  It
 * may be copied or modified, in whole or in part, for the sole purpose of
 * creating applications to be used with the Cascade DataHub.
 */

#include	<stdio.h>
#include	<stdlib.h>
#include	<string.h>
#include	<time.h>
#include	<unistd.h>
#include	<errno.h>

#include	<cogent.h>

#ifdef HAVE_SYS_TIME_H
#include	<sys/time.h>
#endif

const char* UT_USAGE =
	"Usage: %s [-d domain] [-r|-f|-i|-l|-s] [-S security] pointname pointvalue\n"
	;

const char* UT_HELP =
	"\n"
	"Help:\n"
	"    -d domain   Set the domain for the operation.\n"
	"    -r          Write as a real (floating point) number.\n"
	"    -i          Write as a short integer.\n"
	"    -l          Write as a long integer.\n"
	"    -s          Write as a character string.\n"
	"    -S          Set security level for write.\n"
	"\n"
	"Write a point to the Cascade datahub in the given domain.\n"
	"\n"
	"Notes:\n"
	"- Strings containing spaces and special characters must be escaped\n"
  	"    from the shell appropriately.\n"
	"\n"
	"- The type will be guessed if not specified, the order of guessing is:\n"
    "    1) long\n"
    "    2) float\n"
    "    3) string (the default)\n"
  	"    The guess is considered correct if the entire argument is converted.\n"
	"\n"
	"- A long can be given in standard C style, 0x5f (hex), 0647 (oct), etc.\n"
	;

int main (int argc, char** argv)
{
	IP_Msg			*hmsg;
	ST_STATUS		status;
	PT_stCPOINT		point;
	char			*ptname = NULL, *ptvalue = NULL, *domain=NULL, *myname;
	short			type = PT_TYPE_VOID;
	int				security=0;
	IP_Task			*htask;
	int				opt;

	/*
	 * Parse the command line
	 */
	while((opt = getopt(argc, argv, "hd:rilsS:")) != -1)
	{
		switch (opt)
		{
		  case 'h':
		  	UT_Help(argv[0], UT_USAGE, UT_HELP);
			exit(0);
		  case 'd':
			domain = optarg;
			if (strlen(domain) > 15)
				domain[15] = '\0';
			break;
		  case 'r':
			type = PT_TYPE_REAL;
			break;
		  case 'i':
		  case 'l':
			type = PT_TYPE_INT32;
			break;
		  case 's':
			type = PT_TYPE_STRING;
			break;
		  case 'S':
			security = atoi (optarg);
			break;
		  default:
			UT_Usage (argv[0], UT_USAGE, stderr);
			exit (1);
			break;
		}
	}

	/* the last two args better be the point and value */

	if (!argv[optind] || !argv[optind+1])
	{
		UT_Usage(argv[0], UT_USAGE, stderr);
		exit(1);
	}
	ptname	=	argv[optind];
	ptvalue	=	argv[optind+1];
	
	/*
	 * Initialize communication through the Cascade IPC library.  We do
	 * not want other tasks to be notified of the start and stop of
	 * this task, so we do not use IP_NserveInit.  The name server
	 * will never know about this task, so notifications will not be
	 * passed on.  Tasks that attempt to look up their clients in the
	 * name server will treat this task as non-existent.  For example,
	 * Cascade DataHub would not be able to send point exceptions to
	 * this task.
	 */

	if ((myname = strrchr(argv[0], '/')))
		myname++;
	else
		myname = argv[0];
	
	if (!(htask = IP_TaskCreateMe (IP_GetChannelID(), myname, domain,
								   NULL, 0)))
	{
		fprintf (stderr, "Could not initialize Cascade IPC subsystem\n");
		exit (1);
	}

	/*
	 * Set this task's security level.  This level must be greater than or
	 * equal to the security level of the point in the datahub in order
	 * for the write to succeed.  The datahub does not know whether this
	 * task has the right to claim this security level.  That enforcement
	 * is up to the programmer of the user task.
	 */
	IP_TaskSetSecurity (htask, security);

	/*
	 * Create a pre-allocated message structure for use with all
	 * IPC calls.  This includes the DH_* functions.  The API could have
	 * created its own internal message structure, but this would have
	 * left us with no way to control its size or be efficient about
	 * allocation.  This way we do a little more work, but have more
	 * control of what is being allocated.
	 */
	hmsg = IP_MsgCreate (NULL, IP_MsgDefaultSize(), 0);
	
	/*
	 * Zero the point structure.  If we do not do this, the address
	 * field could be non-zero, and then the API will take that to be a
	 * cached datahub address.  That might cause a crash.
	 */
	memset (&point, 0, sizeof(point));

	/*
	 * Provide a point name buffer separately from the rest of the point
	 * structure.  There is no way for the API to know what the allocation
	 * status of a point name is, so it will never attempt to free this
	 * buffer, nor write into it.
	 */
	point.name = ptname;
	point.type = type;
	point.conf = 100;

	/*
	 * Set the time on the point.  If this is not set, then the datahub
	 * will show a zero time.
	 */
#ifdef __QNX__
	{
		struct timespec	tp;
		
		clock_gettime (CLOCK_REALTIME, &tp);
		point.seconds = tp.tv_sec;
		point.nanoseconds = tp.tv_nsec;
	}
#else
	{
		struct timeval	tp;
		gettimeofday (&tp, NULL);
		point.seconds = tp.tv_sec;
		point.nanoseconds = tp.tv_usec * 1000;
	}		
#endif /* __QNX__ */

	/*
	 * Set the value of the point based on the type.
	 */
	switch (point.type)
	{
	  case PT_TYPE_INT32:
	  	point.value.i = strtol(ptvalue, 0, 0);
		break;

	  case PT_TYPE_REAL:
		point.value.r = strtod (ptvalue, NULL);
		break;

	  case PT_TYPE_STRING:
		point.value.s = ptvalue;
		break;

	  case PT_TYPE_VOID:
	  default:
	  {
		  /* try to autodetect type of point */
		  char* eos = 0;

		  /* it's a long if conversion goes to end of string */
		  eos = 0;
		  point.value.i = strtol(ptvalue, &eos, 0);
		  if(*eos == '\0')
		  {
			  point.type = PT_TYPE_INT32;
			  break;
		  }

		  /* it's a double if conversion goes to end of string */
		  eos = 0;
		  point.value.r = strtod(ptvalue, &eos);
		  if(*eos == '\0')
		  {
			  point.type = PT_TYPE_REAL;
			  break;
		  }

		  /* else it's a string */

		  point.type = PT_TYPE_STRING;
		  point.value.s = ptvalue;
		  break;
	  }
	}

	/*
	 * Write the point.  We need a IP_Msg structure and a IP_hTASK in
	 * to provide buffer space and sender identification respectively.
	 */
	if ((status = DH_WritePoint (htask, &point, hmsg, NULL)) != ST_OK)
		printf ("Write point failed: %s\n", ST_StatusName (status));
	
	return (0);
}

5.7.3. Registering for exceptions from the Cascade DataHub

/*
 * Cascade DataHub point waiter:  waiter.c
 *
 * (C) Copyright Cogent Real-Time Systems Inc., 1997.  All rights reserved.
 *
 * This program registers for exceptions on all or selected points
 * in the Cascade DataHub.
 *
 * The program waits in an infinite receive loop for exceptions.
 * Upon receipt of a message the program checks that the message is
 * a Casacde DataHub exception and then parses and prints the point
 * to stdout.
 *
 * Since this program can be made to register for exceptions on multiple
 * points a linked-list facility is used to create a list of point names
 * from the passed args.  This linked list is then walked to register
 * for exceptions on each point individually.
 *
 * This program is supplied with the Cascade DataHub programming API.  It
 * may be copied or modified, in whole or in part, for the sole purpose of
 * creating applications to be used with the Cascade DataHub.
 */

#include	<stdio.h>
#include	<stdlib.h>
#include	<string.h>
#include	<unistd.h>
#include	<time.h>

#include	<cogent.h>

#ifdef HAVE_SYS_KERNEL_H
#include	<sys/kernel.h>
#endif

/*
 * millisecs function converts nanoseconds (10E-9) to milliseconds (10E-3).
 */
static int millisecs(PT_pCPOINT ppoint)
{
	return (ppoint->nanoseconds / 1000000);
}

/*
 * Generate an ASCII representation of the date and time for a given point,
 * truncating the nanoseconds to the next lowest millisecond.  If the time
 * values are 0,0, then generate the string "none".
 */

static char* timeof(PT_pCPOINT ppoint)
{
	static char	ctm[64], msec[16];
	char		*thetime;
	time_t		tm;

	if (ppoint->seconds)
	{
		tm = ppoint->seconds;
		thetime = ctime (&tm);
		strncpy (ctm, thetime, 11);
		strncpy (&ctm[11], &thetime[20], 4);
		strncpy (&ctm[15], &thetime[10], 9);
		ctm[24]='\0';
		sprintf (msec, ".%03d", millisecs(ppoint));
		strcat (ctm, msec);
	}
	else
	{
		strcpy (ctm, "none");
	}
	return (ctm);
}

/*
 * Print point name, value and information to the standard output
 */
void print_point (PT_pCPOINT ppoint)
{
	printf ("Point: %s\n", ppoint->name);
	switch (ppoint->type)
	{
	  case PT_TYPE_INT32:
		printf ("Value: %ld\n", (long)ppoint->value.i);
		break;
	  case PT_TYPE_REAL:
		printf ("Value: %.20g\n", ppoint->value.r);
		break;
	  case PT_TYPE_STRING:
		printf ("Value: %s\n", ppoint->value.s);
		break;
	  default:
		printf ("Value: Unknown\n");
		break;
	}
	printf ("Conf:  %d, Lock: %s, Time: %s, Security: %d\n",
			ppoint->conf, (ppoint->locked ? "yes" : "no"),  timeof(ppoint),
			ppoint->security);
}

#ifdef __USAGE
%C [-d domain] [-q queuename] pointname...
	-d domain - set the domain for the operation
	-q        - name of queue for point changes
	-v        - print debugging information

	Points in another domain may be watched by
	texplicitly naming the domain followed by a colon
	and the point name.
		e.g.,
			waiter mixer:Mixer_1_Weight
		is the same as
	        waiter -d mixer Mixer_1_Weight
#endif

const char* UT_USAGE =
	"Usage: %s [-d domain] [-q queuename] pointname...\n"
	;

const char* UT_HELP =
    "\n"
    "Help:\n"
    "    -h          Print this helpful message.\n"
    "    -d domain   Set the domain for the operation.\n"
    "    -q          Name of queue for point changes.\n"
    "    -v          Print debugging information.\n"
    "\n"
    "   Points in another domain may be watched by\n"
    "   texplicitly naming the domain followed by a colon\n"
    "   and the point name.\n"
    "       e.g.,\n"
    "           waiter mixer:Mixer_1_Weight\n"
    "       is the same as\n"
    "           waiter -d mixer Mixer_1_Weight\n"
	;

int main (int argc, char** argv)
{
	IP_Msg			*hmsg;
	ST_STATUS		status;
	PT_stCPOINT		point;
	char			*ptname = NULL, *qname = NULL, *domain=NULL;
	char			*s, namebuf[256], *name = NULL, *msgend, *myname;
	int				opt;
	int				debugging=0;
	IP_Task			*htask=NULL;
	IP_MsgInfo		msginfo;
	int				type;
	LL_LIST			names;
	LL_stITERATOR	it;

	hmsg = IP_MsgCreate (NULL, IP_MsgDefaultSize(), 0);

	/*
 	 * Create an empty link list 
	 */
	names = LL_New();
	
	while ((opt = getopt(argc, argv, "hd:q:v")) != -1)
	{
		switch (opt)
		{
		  case  'h':
		  	UT_Help(argv[0], UT_USAGE, UT_HELP);
			exit(0);
		  case 'd':
			domain = optarg;
			if (strlen(domain) > 15)
				domain[15] = '\0';
			break;
		  case 'q':
			qname = optarg;
			break;
		  case 'v':
			debugging = 1;
			break;
		  default:
			UT_Usage (argv[0], UT_USAGE, stderr);
			exit(1);
		}
	}
	for(;optind < argc; optind++)
	{
		/* Adds the arg to the end of the linked list */
		LL_AddTail (names, argv[optind]);
	}

	/*
	 * If a queue name is not specified then generate one based on the
	 * process pid
	 */
	if (!qname)
	{
		sprintf (namebuf, "sc/wait%d", getpid());
		qname = strdup (namebuf);
	}
	if ((s = strrchr (argv[0], '/')))
		s++;
	else
		s = argv[0];

	/*
	 * Generate a registered name based on the process pid
	 */
	if (!name)
	{
		sprintf (namebuf, "sc/wait%d", getpid());
		name = strdup (namebuf);
	}
	
	/*
	 * Initialize this process with the name server and queue server
	 */
	if ((myname = strrchr(argv[0], '/')))
		myname++;
	else
		myname = argv[0];
	
	if (!(htask = IP_NserveInit (myname, domain, qname, 0, 0)))
	{
		printf("IP_NserveInit() failed: are qserve and nserve running?\n");
		exit(1);
	}

	memset (&point, 0, sizeof(point));
	
	/*
	 * Traverse the link list of points (if passed) and register for
	 * exceptions on each.  Otherwise register for all exceptions.
	 */
	if (LL_Count(names))
	{
		LL_TRAVERSE (names, char*, ptname, it)
		{
			printf ("Register %s\n", ptname);
			point.name = ptname;
			point.conf = 0;
			point.address = NULL;
			if ((status = DH_RegisterPoint (htask, &point, hmsg, NULL))
				!= ST_OK)
				printf ("Register point failed: %d\n", status);
		}
	}
	else
	{
		if ((status = DH_RegisterAllPoints (htask, NULL, 1,
											hmsg, NULL)) != ST_OK)
		{
			printf ("Register all points failed: %d\n", status);
			exit (-1);
		}
	}

	/*
	 * Provide a point name buffer separately from the rest of the point
	 * structure.  There is no way for the API to know what the allocation
	 * status of a point name is, so it will never attempt to free this
	 * buffer, nor write into it.
	 */
	point.name = namebuf;

	/*
	 * Infinite event loop
	 */
	for (;;)
	{
		/*
		 * Sit receive blocked
		 */
		type = IP_Receive (htask, hmsg, &msginfo);
		if (debugging)
			printf ("Received: %s\n", (char*)IP_MsgData (hmsg));

		/*
		 * If the message is an exception notice from the Cascade DataHub
		 * then parse and print the point.  We could get messages from other
		 * sources as well, which we effectively ignore.  If any process
		 * attempts to Send a message to this task, we just send back a nil
		 * response.
		 */
		switch (type)
		{
		  case IP_ASYNC:			/* DataHub point, probably */
			switch (IP_MsgSubtype (hmsg))
			{
			  case ST_DH_EXCEPTION:
				/* Deal with exceptions by parsing and printing the result */
				for (msgend = IP_MsgData(hmsg); *msgend; )
				{
					DH_ParsePointString (&point, point.name, msgend,
										 &msgend, NULL);
					print_point (&point);
				}
				break;
			  case ST_DH_ECHO:
				/* We never write a point, so ECHO is impossible in this
				 * app.  If we expected echoes, we could handle them in
				 * the same way as exceptions.  The message contents are
				 * identical for both. */
				break;
			  default:
				/* Async general ASCII message, sent by another task using
				 * the Cascade DataHub API.  One such task is the Cascade
				 * name server, nserve.  It will send taskstarted and
				 * taskdied messages when other tasks start and stop:
				 * (taskstarted <name> <domain> <queue> <node> <pid> <chid>)
				 * (taskdied <name> <domain> <queue> <node> <pid> <chid>)
				 */
				break;
			}
			break;
		  case IP_ERROR:
			/* Receive returned zero, which should never happen. */
			break;
		  case IP_SYSTEM:
			/* A task death message that can only be received by a task
			 * that has set its QNX INFORMED bit.  You should avoid this,
			 * and use the taskdied and taskstarted messages in the
			 * IP_ASYNC section if possible. */
			break;
		  case IP_SIGNAL:
			/* A signal caused the Receive to exit prematurely.  No message
			 * was received.  The value of sender is not defined. */
			break;
		  case IP_NONE:
			/* The queue reported a message waiting, but none was available.
			 * This happens if the application intentionally drains the queue,
			 * which is generally a bad idea. */
			break;
		  case IP_SYNC:
			/* Synchronous general ASCII message, sent by another task using
			 * the Cascade DataHub API. */
			IP_MsgCascade (hmsg, "Unsupported", 12, IP_NONE, ST_ERROR);
			IP_MsgInfoReply (&msginfo, hmsg);
			break;
		  case IP_RAW:
			/* Message sent using QNX Send command, not through the
			 * Cascade DataHub API.  This is also where a proxy would
			 * be delivered if we had set up a proxy within this task,
			 * for example, a periodic timer.  In that case, the proxy
			 * ID would be (pid_t)sender, and we would not Reply to it. */
			IP_MsgInfoReplyRaw (&msginfo, "Unsupported", 12);
			break;
		}
	}

	return 0;
}

	

5.7.4.  A sample makefile definition

COGLIB = -l cogdb
CFLAGS = -Oneatx -Q

all: readpt writept waiter

readpt: readpt.c
        cc -o $@ $< $(COGLIB)

writept: writept.c
        cc -o $@ $< $(COGLIB)

waiter: waiter.c
        cc -o $@ $< $(COGLIB)

clean:
        rm -f *.o readpt writept waiter

.PHONY: clean
.IGNORE: clean