/********************************************************************************************************* "Ensemble" is the proprietary property of The Regents of the University of California ("The Regents.") Copyright (c) 2005-10 The Regents of the University of California, Davis campus. All Rights Reserved. Redistribution and use in source and binary forms, with or without modification, are permitted by nonprofit, research institutions for research use only, provided the conditions in the included license agreement are met. Refer to the file "ensemble_license.txt" for the license agreement, located in the top level directory of this distribution. **********************************************************************************************************/ /* * * Accepts socket connections and forwards commands to matlab * engines via matlab's C API, then return matlab output over socket. * Multi-threaded to handle multiple requests quickly, in parallel. * * The reason for this program is that matlab does not provide an API * for php, which is what we're using. This program is intended to be * a dumb relay between php (or any other client using tcp/ip sockets), * and the matlab engine. * * Listening port is defined at startup, and a single instance of this server * can listen on a range of ports simultaneously if required. * * A new socket connection must be made for each matlab command. * Socket commands are of the form "|" * where is any alphanumeric string defined by the client * and is an exact literal command passed directly to matlab. * Returned is the string value of the "outstr" variable in the Matlab workspace. * If this variable is not type string, en error (-1) will be returned. * jg this will be changed * * Sessions are implicitly created when a novel sessionID is encountered. * Each session maintains its own matlab engine (workspace), and * will destroy its matlab object after a timeout period of inactivity. * Sessions can also be explicitly closed by the client with this: * "|close_session", to free matlab license and resources. * * This is a standalone program that can be run as a daemon. All output * is send to stdout, and verbose level can be adjusted via optional * startup argument (see usage:). Standard kill/shutdown signals are handled. * * Note: the matlab-specific build command in the Makefile * * Note: ld will look for "libeng.so" at runtime, so you might have to * add the location of this matlab .so file to LD_LIBRARY_PATH * * Note: It is the client's responsibility to ensure that the "ans" malab * variable contains a string value after each call. Data type mismatches * will return an error code (-1), and the matlab session willbe closed. * * Rev: * - created July 2005 by Jason Golubock for Janata lab, UC Davis * 07/19/05 PJ - added proper conversion of Matlab output variable to string * 09/18/05 PJ - added call to rehash toolbox, time-stamping during * creation of new session in order to avoid accidental * engine destruction. * 03/16/06 ST - Added support for session init and terminate control strings, * made NULL and non-string results non-terminating and * non-fatal conditions. * 03/28/06 PJ - modified calls to Matlab engine to return data into * outstr rather than Matlab's "ans" variable * July 06 JG - added support for different return data types (not just string) * 08/19/09 ST - added #include to avoid "implicit declaration" warnings with gcc 4.1.2 * 09/18/09 ST - added sessionArrayMutex locks in housekeeping thread. Also moved individual session * mutexes to the beginning and end of the for loop. Basically, housekeeping was finding * engines that were just created, before a timestamp was added. It was then destroying * these new engines. The mutexes were in place such that they found the new engine with * an unassigned timestamp (default to year 1969), and then were basically waiting to destroy * them. * 09/22/09 ST - Moved sessionArrayMutex in housekeeping to just the block that closes the matlab engine. * It appears that sessionArrayMutexes should not be necessary after all, since the session * array is static, and sessions are created and destroyed only within the session struct. * So locks should only be necessary within each session struct. However, since matrpc has been * very stable up to now, we are opting for minor changes at this time. The solution will be * to replace all sessionArrayMutexes with the appropriate per-session mutex (if necessary). * * 1/8/10 ST - Defined several error codes and edited error handling code to return descriptive errors. * Added code to call Matlab 'lasterror' command to retrieve the last error thrown by Matlab * in the event that nothing was assigned to the output variable (this occurs when the Matlab * function crashed). * * * */ #include #include #include #include #include #include #include #include #include #include #include /* #include "/usr/local/matlab71/extern/include/engine.h" */ #include "/usr/local/matlab2009a/extern/include/engine.h" #define SOCKET short #define MAX_SESSIONS 60 /* max simultaneous matlab engines */ #define MAX_LISTENERS 10 /* each listener is assigned to a single port */ #define SESSION_TIMEOUT_SECONDS 9000 /* 2.5 hours */ #define MAX_Q_CONNECTIONS 1000 /* socket connect requests are queued */ #define MAX_SOCKET_BIND_ATTEMPTS 10 /* number of times to try binding to a socket */ /* control string constants */ #define CTRL_INIT_MATLAB_SESSION "ctrl:init_matlab_session" #define CTRL_TERMINATE_MATLAB_SESSION "ctrl:terminate_matlab_session" /*error messages - these should all start with the string 'ERROR' and provide an error code*/ #define ERROR_MATLAB_RETURNED_EMPTY_STRING "Error code -1: Matlab script returned empty string" #define ERROR_RETURN_VAL_NOT_STRING "Error code -2: Matlab script returned non-string" #define ERROR_MATLAB_SCRIPT_FAILED "Error code -3: 'outstr' not assigned. Matlab script probably resulted in error" #define ERROR_RETURN_VAL_EMPTY "Error code -4: matOut was NULL. There may have been a problem copying the result from the Matlab Engine." #define ERROR_NO_EMPTY_SESSIONS_AVAIL "Error code -5: No empty matrpc sessions available" #define ERROR_SOCKET_COMMAND_TOO_LONG "Error code -6: Received socket message was too long" #define ERROR_NO_SESSION_SEPARATOR_CHAR "Error code -7: Missing '|' character separater for session id" typedef struct { SOCKET sock; int port; } QueryThreadParams; typedef struct { SOCKET listenSock; int listenPort; int threadNumber; } ListenThreadParams; typedef struct { Engine *matEngine; pthread_mutex_t sessionMutex; time_t tstamp; char sessionID[128]; } Session; /* GLOBAL */ pthread_t threadHandles[MAX_LISTENERS + 1]; /* one more for HK thread */ Session sessions[MAX_SESSIONS]; pthread_mutex_t sessionArrayMutex; volatile int shutdownSignal; int startingPort; int numListenerThreads; int verbose; struct tm *tm; /* PROTOTYPE */ void closeSession( int ); void shutdownServer( void ); void printTime( void ) { time_t now; time( &now ); tm = localtime( &now ); printf( "%s", asctime( tm ) ); } void *housekeepingThread( void *params ) { if( verbose > 0 ) printf( "\t[housekeeping thread] on the job\n" ); int x; time_t now; while( !shutdownSignal ) { time( &now ); for( x = 0; x < MAX_SESSIONS; x++ ) { if( sessions[x].matEngine == NULL ) continue; /* locking just the session when we are checking for session timeout When closing the engine, locking the entire array. */ pthread_mutex_lock( &sessions[x].sessionMutex ); if( now - sessions[x].tstamp > SESSION_TIMEOUT_SECONDS ) { pthread_mutex_lock( &sessionArrayMutex ); if( verbose > 0 ) { printTime(); printf( "\t[housekeeping] Matlab Engine %d has timestamp %s", x, ctime( &sessions[x].tstamp )); printf( "\t[housekeeping] Destroying matlab engine %d\n", x ); } engClose( sessions[x].matEngine ); sessions[x].matEngine = NULL; sessions[x].sessionID[0] = '\0'; pthread_mutex_unlock( &sessionArrayMutex ); } pthread_mutex_unlock( &sessions[x].sessionMutex ); } sleep( 5 * 60 ); /* sleep for 5 minutes */ } if( verbose > 0 ) { printTime(); printf( "\t[housekeeping thread] exiting\n" ); } } int getSessionNumber( const char sessionID[] ) { int sessionNumber; pthread_mutex_lock( &sessionArrayMutex ); /*this loop is one place in which a per-session mutex could replace the above sessionArrayMutex - S.T.*/ for( sessionNumber = 0; sessionNumber < MAX_SESSIONS; sessionNumber++ ) if( strcmp( sessions[sessionNumber].sessionID, sessionID ) == 0 ) break; /* this session ID does not yet exist in our list, or it does exist but its session has been closed */ if( sessionNumber == MAX_SESSIONS || sessions[sessionNumber].matEngine == NULL ) { /* find next empty slot */ for( sessionNumber = 0; sessionNumber < MAX_SESSIONS; sessionNumber++ ) if( sessions[sessionNumber].matEngine == NULL ) break; if( sessionNumber == MAX_SESSIONS ) { pthread_mutex_unlock( &sessionArrayMutex ); return( -1 ); } if( verbose > 0 ) { printTime(); printf("\t[query thread] Creating matlab eng %d for session ID %s\n", sessionNumber, sessionID ); } pthread_mutex_lock( &sessions[sessionNumber].sessionMutex ); strcpy( sessions[sessionNumber].sessionID, sessionID ); sessions[sessionNumber].matEngine=engOpen("matlab -nosplash -nodisplay -nojvm"); engEvalString( sessions[sessionNumber].matEngine, "rehash toolbox"); time( &sessions[sessionNumber].tstamp ); pthread_mutex_unlock( &sessions[sessionNumber].sessionMutex ); } pthread_mutex_unlock( &sessionArrayMutex ); return( sessionNumber ); } void *queryThread( void *params ) { QueryThreadParams *qtp = (QueryThreadParams *) params; int bufLength = 1023; char inbuf[bufLength+1]; char outbuf[bufLength+1]; int bytecount; bytecount = recv( qtp->sock, inbuf, bufLength, 0 ); if( bytecount >= bufLength ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] %s\n", ERROR_SOCKET_COMMAND_TOO_LONG ); } bytecount = send( qtp->sock, ERROR_SOCKET_COMMAND_TOO_LONG, strlen(ERROR_SOCKET_COMMAND_TOO_LONG)+1, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } inbuf[bytecount] = '\0'; if( verbose > 1 ) { printTime(); printf( "\t[query thread] port %d socket %d received: %s\n", qtp->port, qtp->sock, inbuf ); } /* parse out session ID and matlab command */ int x, pipePosition, sessionNumber; char *command; for( pipePosition = 0; pipePosition < bytecount; pipePosition++ ) if( (char) inbuf[pipePosition] == '|' ) { command = &(inbuf[pipePosition+1]); break; } if( pipePosition == bytecount ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] %s\n",ERROR_NO_SESSION_SEPARATOR_CHAR ); } bytecount = send( qtp->sock, ERROR_NO_SESSION_SEPARATOR_CHAR, strlen(ERROR_NO_SESSION_SEPARATOR_CHAR)+1, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } char sessionID[128]; strncpy( sessionID, inbuf, pipePosition ); sessionID[pipePosition] = '\0'; sessionNumber = getSessionNumber( sessionID ); if( sessionNumber < 0 ) { if( verbose > 0 ) { printTime(); printf( "\t[query thread] %s\n",ERROR_NO_EMPTY_SESSIONS_AVAIL ); printf( "\t[query thread] MAX_SESSIONS set to %d.\n",MAX_SESSIONS); } bytecount = send( qtp->sock, ERROR_NO_EMPTY_SESSIONS_AVAIL, strlen(ERROR_NO_EMPTY_SESSIONS_AVAIL)+1, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } /* incoming control string for initializing a matlab session */ /* session was created by getSessionNumber, so all we have to do is exit here */ if(strncmp(command,CTRL_INIT_MATLAB_SESSION,strlen(CTRL_INIT_MATLAB_SESSION))==0) { bytecount = send( qtp->sock, "ok", 2, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } /* incoming control string for terminating a matlab session */ if(strncmp(command,CTRL_TERMINATE_MATLAB_SESSION,strlen(CTRL_TERMINATE_MATLAB_SESSION))==0) { if( verbose > 0 ) { printTime(); printf("\t[query thread] Received terminate control, closing session %d\n",sessionNumber); } bytecount = send( qtp->sock, "ok", 2, 0 ); closeSession( sessionNumber ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } mxArray *matOut = NULL; if( shutdownSignal ) return;/*this isn't really necessary but might help */ /* prepend 'outstr=' to the command. This forces the output to be placed in this variable rather than in 'ans' */ char tmpcommand[bufLength+7]; strcpy(tmpcommand,"outstr="); strcat(tmpcommand, command); /* dispatch command to matlab */ pthread_mutex_lock( &sessions[sessionNumber].sessionMutex ); if( sessions[sessionNumber].matEngine != NULL ) { /*engOutputBuffer( sessions[sessionNumber].matEngine, outbuf, bufLength ); */ if (verbose > 1) { printTime(); printf("\t[query thread] sending command (%s) to matlab\n", tmpcommand); } /* Initializing outstr to 'OUTSTR_NOT_ASSIGNED' so that we can check if assignment was successful*/ engEvalString( sessions[sessionNumber].matEngine, "outstr = 'OUTSTR_NOT_ASSIGNED';" ); /* Issue the Matlab command */ engEvalString( sessions[sessionNumber].matEngine, tmpcommand ); /* Get a pointer to the output variable */ matOut = engGetVariable( sessions[sessionNumber].matEngine, "outstr" ); if (verbose > 1) { printTime(); printf("\t[query thread] matOut address: %x\n", matOut); } } time( &sessions[sessionNumber].tstamp ); pthread_mutex_unlock( &sessions[sessionNumber].sessionMutex ); /*determine if matOut format was acceptable*/ char errorCode[bufLength + 1]; int matOutFormatOK = 1; if(matOut == NULL) { strcpy(errorCode,ERROR_RETURN_VAL_EMPTY); matOutFormatOK = 0; } else if(!mxIsChar(matOut)) { strcpy(errorCode,ERROR_RETURN_VAL_NOT_STRING); matOutFormatOK = 0; } /*if matOutFormat was OK, assign string to outbuf, then see if the string was properly assigned*/ if(matOutFormatOK) { /* Deal with copying the actual data from the Matlab workspace variable to the output variable */ char *outstr=NULL; outstr=mxArrayToString(matOut); strcpy(outbuf,outstr); mxFree(outstr); /*see if the string was assigned*/ if(strcmp(outbuf,"OUTSTR_NOT_ASSIGNED") == 0) { matOutFormatOK = 0; /*get lasterr from matlab. This will be passed back to sender. Then reset the lasterror info */ char errorMsg[bufLength + 1]; engEvalString(sessions[sessionNumber].matEngine, "errorMsg = lasterror; errorMsg = errorMsg.message; lasterror('reset');"); matOut = engGetVariable( sessions[sessionNumber].matEngine, "errorMsg" ); outstr = mxArrayToString(matOut); strcpy(errorMsg,outstr); mxFree(outstr); sprintf(errorCode,"%s\nResult from Matlab lasterr:\n%s",ERROR_MATLAB_SCRIPT_FAILED,errorMsg); engEvalString( sessions[sessionNumber].matEngine,"clear errorMsg;"); /*clear errorMsg variable*/ } else if(strcmp(outbuf,"") == 0) { matOutFormatOK = 0; strcpy(errorCode,ERROR_MATLAB_RETURNED_EMPTY_STRING); } } /*if matOutFormat was not acceptable, log the error and return error code*/ if( !matOutFormatOK ) { if( verbose > 0 ) { printTime(); printf("\t[query thread] %s\n",errorCode ); } bytecount = send( qtp->sock, errorCode , strlen(errorCode)+1, 0 ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); return; } /* send matlab response back to client */ if (verbose > 1) { printTime(); printf(" \t[query thread] trying to return string: %s\n", outbuf); } bytecount = send( qtp->sock, outbuf, strlen(outbuf)+1, 0 ); if( bytecount != strlen( outbuf )+1 && verbose > 0 ) { printTime(); printf( "\t[query thread] error sending matlab response over socket\n" ); } /* Matlab output control string for terminating a session */ if(strncmp(outbuf,CTRL_TERMINATE_MATLAB_SESSION,strlen(CTRL_TERMINATE_MATLAB_SESSION)) == 0) closeSession( sessionNumber ); shutdown( qtp->sock, SHUT_RDWR ); close( qtp->sock ); free( qtp ); } void *listenThread( void *params ) { ListenThreadParams *ltp = (ListenThreadParams *) params; SOCKET connectSock; struct sockaddr clientInfo; socklen_t clientInfoLength = sizeof( clientInfo ); int status; struct timeval tv; fd_set fdset; pthread_t threadHandle; pthread_attr_t threadAttr; QueryThreadParams *queryThreadParams; /* set attributes for queryThread - this should be small and fast */ pthread_attr_init( &threadAttr ); pthread_attr_setstacksize( &threadAttr, 100000 ); /* 100 kB */ /* returns thread resources immediately after termination but join is no longer possible in this state */ pthread_attr_setdetachstate( &threadAttr, PTHREAD_CREATE_DETACHED ); if( verbose > 0 ) printf( "\t[listener thread %d] listening on port %d\n", ltp->threadNumber, ltp->listenPort ); while( !shutdownSignal ) { /* must be reset every time */ FD_ZERO( &fdset ); FD_SET( ltp->listenSock, &fdset ); tv.tv_sec = 3; tv.tv_usec = 0; status = select( (ltp->listenSock)+1, &fdset, NULL, NULL, &tv ); if( status <= 0 ) { if( verbose > 2 ) { printTime(); printf( "\t[listener thread %d] select returned %d, continue\n", ltp->threadNumber, status ); } continue; } connectSock = accept( ltp->listenSock, (struct sockaddr *) &clientInfo, &clientInfoLength); if( connectSock < 0 ) { if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] Error calling socket accept()\n", ltp->threadNumber ); } break; } queryThreadParams = malloc( sizeof( QueryThreadParams ) ); queryThreadParams->sock = connectSock; queryThreadParams->port = ltp->listenPort; /* we're reusing the same thread handle over and over. doesn't really matter since query threads are disposable */ status = pthread_create( &threadHandle, &threadAttr, &queryThread, (void *) queryThreadParams ); if( status ) { if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] Error creating query thread: %d\n", ltp->threadNumber, status ); } break; } } shutdown( ltp->listenSock, SHUT_RDWR ); close( ltp->listenSock ); if( verbose > 0 ) { printTime(); printf( "\t[listener thread %d] exiting\n", ltp->threadNumber ); } free( ltp ); } void catchExitSignal( int sig ) { if( verbose > 0 ) { printTime(); printf( "shutting down\n" ); } shutdownServer(); if( verbose > 0 ) { printTime(); printf( "shutdown complete\n" ); } exit(0); } int main( int argc, char *argv[] ) { int x; if( argc < 2 ) { printf( "usage: matrpc [verbose 0-3] [num listeners]\nverbose 0 means absolutely no output\n\n" ); exit(1); } startingPort = atoi( argv[1] ); verbose = 1; if( argc > 2 ) verbose = atoi( argv[2] ); if( verbose < 0 ) verbose = 0; if( verbose > 3 ) verbose = 3; numListenerThreads = 1; if( argc > 3 ) numListenerThreads = atoi( argv[3] ); if( numListenerThreads > MAX_LISTENERS || numListenerThreads < 1 ) { if( verbose > 0 ) { printf( "invalid number of listener threads\n" ); } return( -1 ); } signal( SIGINT, catchExitSignal ); signal( SIGQUIT, catchExitSignal ); signal( SIGTERM, catchExitSignal ); shutdownSignal = 0; pthread_mutex_init( &sessionArrayMutex, NULL ); for( x = 0; x < MAX_SESSIONS; x++ ) { /* session is tracked by whether matEngine == NULL */ sessions[x].matEngine = NULL; pthread_mutex_init( &sessions[x].sessionMutex, NULL ); } int listenPort, status; ListenThreadParams *ltp; struct sockaddr_in sockInfo; int num_tries; for( x = 0; x < numListenerThreads; x++ ) { ltp = malloc( sizeof( ListenThreadParams ) ); ltp->threadNumber = x; ltp->listenPort = startingPort + x; sockInfo.sin_family = AF_INET; sockInfo.sin_addr.s_addr = htonl(INADDR_ANY); sockInfo.sin_port = htons(ltp->listenPort); ltp->listenSock = socket( AF_INET, SOCK_STREAM, 0 ); if( ltp->listenSock < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error creating listener socket for thread: %d\n", x ); } return( -1 ); } status = -1; num_tries = 0; while ((status == -1) && (num_tries < MAX_SOCKET_BIND_ATTEMPTS)) { status = bind( ltp->listenSock, (struct sockaddr *) &sockInfo,sizeof(sockInfo)); num_tries++; } if ((num_tries > 1) && (verbose > 1)) { printTime(); printf("%d tries to bind socket\n", num_tries); } if( status < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error %d binding socket\n", status ); } return( -1 ); } if( listen( ltp->listenSock, MAX_Q_CONNECTIONS ) < 0 ) { if( verbose > 0 ) { printTime(); printf( "Error calling socket listen()\n" ); } return( -1 ); } status = pthread_create( &threadHandles[x], NULL, &listenThread, (void *) ltp ); if( status ) { if( verbose > 0 ) { printTime(); printf( "Error creating listener [thread %d]: %d\n", x, status ); } return( -1 ); } } status = pthread_create( &threadHandles[numListenerThreads], NULL, &housekeepingThread, NULL ); if( status ) { if( verbose > 0 ) { printTime(); printf( "Error creating housekeeper thread: %d\n", status ); } return( -1 ); } if( verbose > 0 ) printf( "startup complete\n" ); /* do nothing */ while(1) sleep(100); return(0); } void shutdownServer( void ) { shutdownSignal = 1; int x, status; /* go to numlistenerThreads + 1 for HK thread */ for( x = 0; x < numListenerThreads+1; x++ ) { status = pthread_join(threadHandles[x], NULL ); if( status != 0 && verbose > 0 ) { printTime(); printf( "Error on thread %d join\n", x ); } } for( x = 0; x < MAX_SESSIONS; x++ ) { closeSession(x); pthread_mutex_destroy( &sessions[x].sessionMutex ); } pthread_mutex_destroy( &sessionArrayMutex ); } void closeSession( int s ) { pthread_mutex_lock( &sessionArrayMutex ); if( sessions[s].matEngine != NULL ) { pthread_mutex_lock( &sessions[s].sessionMutex ); if( verbose > 0 ) { printTime(); printf( "\tdestroying matlab engine %d\n",s); } engClose( sessions[s].matEngine ); sessions[s].matEngine = NULL; pthread_mutex_unlock( &sessions[s].sessionMutex ); } pthread_mutex_unlock( &sessionArrayMutex ); }