/********************************************************************************************************* "Ensemble" is the proprietary property of The Regents of the University of California ("The Regents.") Copyright (c) 2005-10 The Regents of the University of California, Davis campus. All Rights Reserved. Redistribution and use in source and binary forms, with or without modification, are permitted by nonprofit, research institutions for research use only, provided the conditions in the included license agreement are met. Refer to the file "ensemble_license.txt" for the license agreement, located in the top level directory of this distribution. **********************************************************************************************************/ /* * * This was an attempt to use the matrpc code to establish communication * between Ensemble and LabView. Although it hasn't been used, technically * it should work. This effort is on hold for now. * * Stefan Tomic 12/07 * * * Accepts socket connections and forwards commands to matlab * engines via matlab's C API, then return matlab output over socket. * Multi-threaded to handle multiple requests quickly, in parallel. * * The reason for this program is that matlab does not provide an API * for php, which is what we're using. This program is intended to be * a dumb relay between php (or any other client using tcp/ip sockets), * and the matlab engine. * * Listening port is defined at startup, and a single instance of this server * can listen on a range of ports simultaneously if required. * * A new socket connection must be made for each matlab command. * Socket commands are of the form "|" * where is any alphanumeric string defined by the client * and is an exact literal command passed directly to matlab. * Returned is the string value of the "outstr" variable in the Matlab workspace. * If this variable is not type string, en error (-1) will be returned. * jg this will be changed * * Sessions are implicitly created when a novel sessionID is encountered. * Each session maintains its own matlab engine (workspace), and * will destroy its matlab object after a timeout period of inactivity. * Sessions can also be explicitly closed by the client with this: * "|close_session", to free matlab license and resources. * * This is a standalone program that can be run as a daemon. All output * is send to stdout, and verbose level can be adjusted via optional * startup argument (see usage:). Standard kill/shutdown signals are handled. * * Note: the matlab-specific build command in the Makefile * * Note: ld will look for "libeng.so" at runtime, so you might have to * add the location of this matlab .so file to LD_LIBRARY_PATH * * Note: It is the client's responsibility to ensure that the "ans" malab * variable contains a string value after each call. Data type mismatches * will return an error code (-1), and the matlab session willbe closed. * * Rev: * - created July 2005 by Jason Golubock for Janata lab, UC Davis * 07/19/05 PJ - added proper conversion of Matlab output variable to string * 09/18/05 PJ - added call to rehash toolbox, time-stamping during * creation of new session in order to avoid accidental * engine destruction. * 03/16/06 ST - Added support for session init and terminate control strings, * made NULL and non-string results non-terminating and * non-fatal conditions. * 03/28/06 PJ - modified calls to Matlab engine to return data into * outstr rather than Matlab's "ans" variable * July 06 JG - added support for different return data types (not just string) * 08/19/09 ST - added #include to avoid "implicit declaration" warnings with gcc 4.1.2 */ #include #include #include #include #include #include #include #include #include #include #include #include #include "/usr/local/matlab2009a/extern/include/engine.h" #define SOCKET short #define MAX_SESSIONS 1 /* max simultaneous matlab engines */ #define MAX_LISTENERS 1 /* each listener is assigned to a single port */ #define SESSION_TIMEOUT_SECONDS 9000 /* 2.5 hours */ #define MAX_Q_CONNECTIONS 1000 /* socket connect requests are queued */ #define MAX_SOCKET_BIND_ATTEMPTS 10 /* number of times to try binding to a socket */ /* control string constants */ #define CTRL_LABVIEW_UNPAUSE "254" #define CTRL_LABVIEW_PAUSE "255" typedef struct { SOCKET sock; int port; } QueryThreadParams; typedef struct { SOCKET listenSock; int listenPort; int threadNumber; } ListenThreadParams; typedef struct { //Engine *matEngine; int lvSockfd; pthread_mutex_t sessionMutex; time_t tstamp; char sessionID[128]; } Session; /* GLOBAL */ pthread_t threadHandles[MAX_LISTENERS + 1]; /* one more for HK thread */ Session sessions[MAX_SESSIONS]; pthread_mutex_t sessionArrayMutex; volatile int shutdownSignal; int startingPort; int numListenerThreads; int verbose; int lvPort; char *lvHostname; struct tm *tm; /* PROTOTYPE */ void closeSession( int ); void shutdownServer( void ); void dispErrorMessage( char * ); void printTime( void ) { time_t now; time( &now ); tm = localtime( &now ); printf( "%s", asctime( tm ) ); } void *housekeepingThread( void *params ) { if( verbose > 0 ) printf( "\t[housekeeping thread] on the job\n" ); int x; time_t now; while( !shutdownSignal ) { time( &now ); for( x = 0; x < MAX_SESSIONS; x++ ) { if( sessions[x].lvSockfd < 0) continue; if( now - sessions[x].tstamp > SESSION_TIMEOUT_SECONDS ) { if( verbose > 0 ) { printTime(); printf( "\t[housekeeping] destroying matlab engine %d\n", x ); } pthread_mutex_lock( &sessions[x].sessionMutex ); //ST housekeeping (close socket here?) sessions[x].lvSockfd = -1; //engClose( sessions[x].matEngine ); //sessions[x].matEngine = NULL; sessions[x].sessionID[0] = '\0'; pthread_mutex_unlock( &sessions[x].sessionMutex ); } } sleep( 5 ); } if( verbose > 0 ) { printTime(); printf( "\t[housekeeping thread] exiting\n" ); } } int getSessionNumber( const char sessionID[] ) { int sessionNumber,lvSockfd,lvPortno; struct hostent *lvServer; struct sockaddr_in lvServ_addr; pthread_mutex_lock( &sessionArrayMutex ); for( sessionNumber = 0; sessionNumber < MAX_SESSIONS; sessionNumber++ ) if( strcmp( sessions[sessionNumber].sessionID, sessionID ) == 0 ) break; /* this session ID does not yet exist in our list, or it does exist but its session has been closed */ if( sessionNumber == MAX_SESSIONS || sessions[sessionNumber].lvSockfd < 0) { /* find next empty slot */ for( sessionNumber = 0; sessionNumber < MAX_SESSIONS; sessionNumber++ ) if( sessions[sessionNumber].lvSockfd < 0 ) break; if( sessionNumber == MAX_SESSIONS ) { pthread_mutex_unlock( &sessionArrayMutex ); return( -1 ); } if( verbose > 0 ) { printTime(); printf("\t[query thread] creating matlab eng %d for session ID %s\n", sessionNumber, sessionID ); } pthread_mutex_lock( &sessions[sessionNumber].sessionMutex ); strcpy( sessions[sessionNumber].sessionID, sessionID ); //ST add socket creation code here lvSockfd = socket(AF_INET,SOCK_STREAM,0); if(lvSockfd < 0) dispErrorMessage("[query thread] ERROR opening LabView socket"); else sessions[sessionNumber].lvSockfd = lvSockfd; lvServer = gethostbyname(lvHostname); if(lvServer == NULL) dispErrorMessage("[query thread] ERROR, no such LabView host\n"); //zero out lvServ_addr bzero((char *) &lvServ_addr,sizeof(lvServ_addr)); lvServ_addr.sin_family = AF_INET; bcopy((char *) lvServer->h_addr,(char *)&lvServ_addr.sin_addr.s_addr,lvServer->h_length); lvServ_addr.sin_port = htons(lvPort); if(connect(lvSockfd,(struct sockaddr *)&lvServ_addr,sizeof(lvServ_addr)) < 0) dispErrorMessage("[query thread] ERROR connecting to LabView socket\n"); //sessions[sessionNumber].matEngine=engOpen("matlab -nosplash -nodisplay -nojvm"); //engEvalString( sessions[sessionNumber].matEngine, "rehash toolbox"); time( &sessions[sessionNumber].tstamp ); pthread_mutex_unlock( &sessions[sessionNumber].sessionMutex ); } pthread_mutex_unlock( &sessionArrayMutex ); return( sessionNumber ); } void *queryThread( void *params ) { QueryThreadParams *qtp = (QueryThreadParams *) params; int bufLength = 255; char inbuf[bufLength+1]; char outbuf[bufLength+1]; char lvInBuffer[bufLength+1]; int bytecount; bytecount = recv( qtp->sock, inbuf, bufLength, 0 ); if( bytecount >= bufLength ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] socket command too long\n" ); } bytecount = send( qtp->sock, "-1", 2, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } inbuf[bytecount] = '\0'; if( verbose > 1 ) { printTime(); printf( "\t[query thread] port %d socket %d received: %s\n", qtp->port, qtp->sock, inbuf ); } /* parse out session ID and matlab command */ int x, pipePosition, sessionNumber; char *command; for( pipePosition = 0; pipePosition < bytecount; pipePosition++ ) if( (char) inbuf[pipePosition] == '|' ) { command = &(inbuf[pipePosition+1]); break; } if( pipePosition == bytecount ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] error: no '|' character found\n" ); } bytecount = send( qtp->sock, "-1", 2, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } char sessionID[128]; strncpy( sessionID, inbuf, pipePosition ); sessionID[pipePosition] = '\0'; sessionNumber = getSessionNumber( sessionID ); if( sessionNumber < 0 ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] error: no empty sessions available\n" ); } bytecount = send( qtp->sock, "-1", 2, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } /* Control string for pausing labview. Free up the thread here by calling closeSession */ if(strncmp(command,CTRL_LABVIEW_PAUSE,strlen(CTRL_LABVIEW_PAUSE))==0) { closeSession( sessionNumber ); bytecount = send( qtp->sock, "ok", 2, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } //zero out the LabView input buffer bzero(lvInBuffer,sizeof(lvInBuffer)+1); if( shutdownSignal ) return;/*this isn't really necessary but might help */ /* dispatch command to matlab */ pthread_mutex_lock( &sessions[sessionNumber].sessionMutex ); if( sessions[sessionNumber].lvSockfd >= 0 ) { /*engOutputBuffer( sessions[sessionNumber].matEngine, outbuf, bufLength ); */ if (verbose > 1) { printTime(); printf("\t[query thread] sending command (%s) to LabView\n", command); } /* Initialize the outstr variable on the workspace*/ //engEvalString( sessions[sessionNumber].matEngine, "outstr = '';" ); /* Issue the Matlab command */ //engEvalString( sessions[sessionNumber].matEngine, tmpcommand ); /* Get a pointer to the output variable */ //matOut = engGetVariable( sessions[sessionNumber].matEngine, "outstr" ); //ST add socket message here bytecount = write(sessions[sessionNumber].lvSockfd,command,strlen(command)); if(bytecount < 0) dispErrorMessage("[query thread] ERROR writing to LabView socket"); if (verbose > 1) { printTime(); printf("\t%d bytes written to LabView",bytecount); } //ST replaced Matlab response handling code with LabView socket handling code bytecount = read(sessions[sessionNumber].lvSockfd,lvInBuffer,sizeof(lvInBuffer)); if (bytecount < 0) dispErrorMessage("[query thread] ERROR reading from LabView socket"); printTime(); printf("\tReceived %d bytes from Labview. Message: %s\n",bytecount,lvInBuffer); } time( &sessions[sessionNumber].tstamp ); pthread_mutex_unlock( &sessions[sessionNumber].sessionMutex ); strcpy(outbuf,lvInBuffer); if (verbose > 1) { printTime(); printf(" \t[query thread] trying to return string: %s\n", outbuf); } bytecount = send(qtp->sock,outbuf,strlen(outbuf)+1,0); if(bytecount != strlen(outbuf) + 1) dispErrorMessage("[query thread] ERROR sending LabView response back to Ensemble client"); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); } void *listenThread( void *params ) { ListenThreadParams *ltp = (ListenThreadParams *) params; SOCKET connectSock; struct sockaddr clientInfo; socklen_t clientInfoLength = sizeof( clientInfo ); int status; struct timeval tv; fd_set fdset; pthread_t threadHandle; pthread_attr_t threadAttr; QueryThreadParams *queryThreadParams; /* set attributes for queryThread - this should be small and fast */ pthread_attr_init( &threadAttr ); pthread_attr_setstacksize( &threadAttr, 100000 ); /* 100 kB */ /* returns thread resources immediately after termination but join is no longer possible in this state */ pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_DETACHED ); if( verbose > 0 ) printf( "\t[listener thread %d] listening on port %d\n", ltp->threadNumber, ltp->listenPort ); while( !shutdownSignal ) { /* must be reset every time */ FD_ZERO( &fdset ); FD_SET( ltp->listenSock, &fdset ); tv.tv_sec = 3; tv.tv_usec = 0; status = select( (ltp->listenSock)+1, &fdset, NULL, NULL, &tv ); if( status <= 0 ) { if( verbose > 2 ) { printTime(); printf( "\t[listener thread %d] select returned %d, continue\n", ltp->threadNumber, status ); } continue; } connectSock = accept( ltp->listenSock, (struct sockaddr *) &clientInfo, &clientInfoLength); if( connectSock < 0 ) { if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] Error calling socket accept()\n", ltp->threadNumber ); } break; } queryThreadParams = malloc( sizeof( QueryThreadParams ) ); queryThreadParams->sock = connectSock; queryThreadParams->port = ltp->listenPort; /* we're reusing the same thread handle over and over. doesn't really matter since query threads are disposable */ status = pthread_create( &threadHandle, &threadAttr, &queryThread, (void *) queryThreadParams ); if( status ) { if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] Error creating query thread: %d\n", ltp->threadNumber, status ); } break; } } shutdown( ltp->listenSock, SHUT_RDWR ); close( ltp->listenSock ); if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] exiting\n", ltp->threadNumber ); } free( ltp ); } void catchExitSignal( int sig ) { if( verbose > 0 ) { printTime(); printf( "shutting down\n" ); } shutdownServer(); if( verbose > 0 ) { printTime(); printf( "shutdown complete\n" ); } exit(0); } int main( int argc, char *argv[] ) { int x; if( argc < 4 ) { printf( "usage: lvrpc [verbose 0-3] [num listeners]\nverbose 0 means absolutely no output\n\n" ); exit(1); } startingPort = atoi( argv[1] ); lvHostname = (char *) malloc(strlen(argv[2]))+1; strcpy(lvHostname,argv[2]); lvPort = atoi( argv[3] ); verbose = 1; if( argc > 4 ) verbose = atoi( argv[4] ); if( verbose < 0 ) verbose = 0; if( verbose > 3 ) verbose = 3; numListenerThreads = 1; if( argc > 5 ) numListenerThreads = atoi( argv[5] ); if( numListenerThreads > MAX_LISTENERS || numListenerThreads < 1 ) { if( verbose > 0 ) { printf( "invalid number of listener threads\n" ); } return( -1 ); } signal( SIGINT, catchExitSignal ); signal( SIGQUIT, catchExitSignal ); signal( SIGTERM, catchExitSignal ); shutdownSignal = 0; pthread_mutex_init( &sessionArrayMutex, NULL ); for( x = 0; x < MAX_SESSIONS; x++ ) { sessions[x].lvSockfd = -1; pthread_mutex_init( &sessions[x].sessionMutex, NULL ); } int listenPort, status; ListenThreadParams *ltp; struct sockaddr_in sockInfo; int num_tries; for( x = 0; x < numListenerThreads; x++ ) { ltp = malloc( sizeof( ListenThreadParams ) ); ltp->threadNumber = x; ltp->listenPort = startingPort + x; sockInfo.sin_family = AF_INET; sockInfo.sin_addr.s_addr = htonl(INADDR_ANY); sockInfo.sin_port = htons(ltp->listenPort); ltp->listenSock = socket( AF_INET, SOCK_STREAM, 0 ); if( ltp->listenSock < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error creating listener socket for thread: %d\n", x ); } return( -1 ); } status = -1; num_tries = 0; while ((status == -1) && (num_tries < MAX_SOCKET_BIND_ATTEMPTS)) { status = bind( ltp->listenSock, (struct sockaddr *) &sockInfo,sizeof(sockInfo)); num_tries++; } if ((num_tries > 1) && (verbose > 1)) { printTime(); printf("%d tries to bind socket\n", num_tries); } if( status < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error %d binding socket\n", status ); } return( -1 ); } if( listen( ltp->listenSock, MAX_Q_CONNECTIONS ) < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error calling socket listen()\n" ); } return( -1 ); } status = pthread_create( &threadHandles[x], NULL, &listenThread, (void *) ltp ); if( status ) { if( verbose > 0 ) { printTime(); printf( "Error creating listener [thread %d]: %d\n", x, status ); } return( -1 ); } } status = pthread_create( &threadHandles[numListenerThreads], NULL, &housekeepingThread, NULL ); if( status ) { if( verbose > 0 ) { printTime(); printf( "Error creating housekeeper thread: %d\n", status ); } return( -1 ); } if( verbose > 0 ) printf( "startup complete\n" ); while(1) sleep(100); return(0); } void shutdownServer( void ) { shutdownSignal = 1; int x, status; /* go to numlistenerThreads + 1 for HK thread */ for( x = 0; x < numListenerThreads+1; x++ ) { status = pthread_join(threadHandles[x], NULL ); if( status != 0 && verbose > 0 ) { printTime(); printf( "Error on thread %d join\n", x ); } } for( x = 0; x < MAX_SESSIONS; x++ ) { closeSession(x); pthread_mutex_destroy( &sessions[x].sessionMutex ); } pthread_mutex_destroy( &sessionArrayMutex ); } void closeSession( int s ) { int status; pthread_mutex_lock( &sessionArrayMutex ); if( sessions[s].lvSockfd >= 0 ) { pthread_mutex_lock( &sessions[s].sessionMutex ); if( verbose > 0 ) { printTime(); printf( "destroying matlab engine %d\n",s); } //ST Close the LabView socket here status = close(sessions[s].lvSockfd); if( status < 0 ) dispErrorMessage("[closeSession] ERROR: failed to close socket"); sessions[s].lvSockfd = -1; pthread_mutex_unlock( &sessions[s].sessionMutex ); } pthread_mutex_unlock( &sessionArrayMutex ); } void dispErrorMessage( char *errorMessage ) { if(verbose > 0) { printTime(); printf("\t%s",errorMessage); } }