Home Reference Source

src/server.js

/*
 * Kurento client that connects to Asterisk!
 *
 * Sar Champagne ([email protected])
 */
 
/*
 * Imports of internal dependencies.
 */
const config = require('./configuration.js');
const registry = require('./registry.js');
const users = require('./sipUsers.js');

/*
 * Imports of external dependencies.
 */
const crypto = require('crypto');
const Kurento = require('kurento-client');
const NodeWS = require('jssip-node-websocket');
const SDES = Kurento.getComplexType('SDES');
const SDP = require('sdp-transform');
const SIP = require('jssip');

/*
 * Definition of global variables.
 */
var userRegistry = new registry.UserRegistry();
var sipUserPool = new users.SipUserPool();
var kurentoClient = null;
var userCount = 1;

/*
 * Debug variables.
 */
var pipelineState = '';

/*
 * Definition of helper class to
 * represent callee session.
 */
function UserSession(id, ext, pass, ua) {
    this.id = id;
    this.ext = ext;
	this.pass = pass;
    this.ua = ua;
    this.sdpOffer = null;
    this.sdpAnswer = null;
	this.session = null;
	this.pipeline = null;
	this.rtpEndpoint = null;
	this.recorderEndpoint = null;
    this.incomingMediaProfile = null;
}

/*
 * Start the program!
 * This spawns a JSSIP UA that listens for an incoming call.
 */
var user = sipUserPool.getSipUser();
register(userCount, user.sipId, user.sipPass);

// ********************** METHOD DEFINITIONS ********************************

/*
 * Spawns a JSSIP UA to listen for incoming cals.
 *
 * All JSSIP event handlers are defined here!
 */
function register(id, ext, password, callback) {
    debuglog("Starting...");
	
	function onError(error) {
        debuglog(JSON.stringify({id:'registerResponse', response : 'rejected ', message: error}));
    }

    if (!ext) {
        return onError("empty user ext");
    }

    if (userRegistry.getByName(ext)) {
        return onError("Server user " + ext + " is already registered");
    }
   
    // Make a JSSIP User Agent (UA) to handle SIP messaging.
    var uri = `wss://${config.sipServer}:${config.sipWsPort}/ws`;
    debuglog("Websocket to: " + uri);
    var socket = new NodeWS(uri, {
        requestOptions :
        {
            ciphers: "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384",
        }
    });

    // JSSIP UA Configuration
    var configuration = {
        sockets: [ socket ],
		uri: `sip:${ext}@${config.sipServer}`,
        password,
        registrar_server: `sip:${config.sipServer}`,
        register: true,
        rtc: () => ({
            //Do nothing, required by WebRTCVentures Library Modifications
        })
    };
    debuglog("Registering to: " + configuration.uri);
    var userAgent = new SIP.UA(configuration);   
    userAgent.start();

    // Register them to our local user registry
    userRegistry.register(new UserSession(id, ext, password, userAgent));
    
    // --------------------- EVENT HANDLERS FOR JSSIP UA -----------------------
	
    // Event Handler: Registered successfully.
    userAgent.on('registered', () => {
        log(ext, 'SIP UA registered.');
    	debuglog(`-- UA registration status: ${userAgent.isRegistered()}`)
    	debuglog(`-- UA connection status: ${userAgent.isConnected()}`)
    });

    // Event Handler: Registration failed.
    userAgent.on('registrationFailed', () => {	
        debuglog(`SIP client could not be registered for ${ext}`);
   		stopAndExit(-1, id, "SIP Registration Failed");
    });

	// Event Handler: The SIP UA has disconnected.
    userAgent.on('disconnect', function() {
        debuglog(`${id}:${ext} user agent disconnected`);
		stopAndExit(-2, id, "SIP UA Disconnected Unexpectedly");
    });
	
    // A NEW CALL IS COMING IN!
	// Event Handler: A new call invite has been received.
    userAgent.on('newRTCSession', (data) => {
	    newIncomingCall(id, ext, data);
    });
    // ------------------- END EVENT HANDLERS (JSSIP UA) -------------------------------
}

/*
 * Initialize an incoming call!
 *
 * Event handlers for RTC session
 */
function newIncomingCall(id, ext, data) {
    log(ext, 'Initial invite received');

    let callee = userRegistry.getById(id);  
    callee.session = data.session;
    //callee.session.answer(); Handled on line 166 

    //Automatically answer incoming calls.
    //Constructs a Kurento pipeline, then uses to Kurento to generate a 
    //response SDP for the JSSIP Session to send back.  
    connectIncomingCall(id, data.request.body, function (error) {
        if (error) {
            debuglog('connectIncomingCall received error: ' + error);
        }
        else {
            //KMS has succesfully started a pipeline and generated a return SDP.
            //Here we send that return SDP back to the caller.
            var options = {
                rtcAnswerConstraints: callee.sdpAnswer
            };
            callee.session.answer(options);
            debuglog('Initial call processed.'); //At this point, we are just waiting for events to trigger.	
        }
    });
    
    // Make a new user for next session!
    //var newUser = sipUserPool.getSipUser();
    //userCount++;
    //register(userCount, newUser.sipId, newUser.sipPass);	

    // --------------------- EVENT HANDLERS FOR JSSIP RTCSESSION -----------------------
    // Event Handler: A reinvite has been received.
    callee.session.on('reinvite', (data) => {
        debuglog('Reinvite received, automatically responding.');
        callee.session.respondReinvite();
    });

    // Event Handler: A dtmf has been received.
    // Currently just prints out the DTMF and does nothing.
    callee.session.on('newDTMF', (data) => {
        debuglog('NEW DTMF from ' + data.originator + ': ' + data.dtmf.tone);
    });

    // Triggers when the RTCSession is ended, and shuts down the program.
    callee.session.on('ended', () => {
        log(ext, 'SIP Conversation ended.');
        // Wait for the recorder to stop gracefully
        if (callee.recorderEndpoint) {
            callee.recorderEndpoint.stopAndWait().then(function(error) {
                if (error) {
                    debuglog('Error stopping recorder!');
                    stopAndExit(-4, id, "Error Stopping Recorder");
                }
                debuglog('Recording stopped.');

                // Now stop the program completely.
                stopAndExit(0, id); //0 for OK exit.
            });
        }
        callee.ua.unregister();
        sipUserPool.releaseSipUser({sipId: callee.ext, sipPass: callee.pass})
    });
    // -------------------- END EVENT HANDLERS (JSSIP RTCSession) ----------------------------
}

/*
 * Connects the incoming call to KMS.  Performs several important functions:
 * 		- Creates the media pipeline
 *		- Creates the endpoints, a RtpEndpoint, a Recorder Endpoint, and the PlayerEndpoint(s)
 *		- Connects the endpoints
 *		- Uses the RtpEndpoint and the KMS to process the incoming SDP offer and generate an answer SDP.
 *			- NOTE: this process also configures the RtpEndpoint.
 *		- Starts the recorder recording.
 *
 *		- CONTAINS EVENT HANDLERS FOR ALL ENDPOINTS.
 */ 
function connectIncomingCall(id, sdpOffer, callback) {
	// If this fails, then we could not access the KMS.
    debuglog("Getting client.");
	getKurentoClient(function(error, kurentoClient) {
        if (error) {
            return callback(error);
        }
		debuglog('Got Kurento clientr');

		// First we create the pipeline.
        kurentoClient.create('MediaPipeline', function(error, pipeline) {
            if (error) {
                return callback(error);
            }

			// Pipeline creation successful, save to field.
			var callee = userRegistry.getById(id);  
			callee.pipeline = pipeline;
    
            // Now, check the incoming sdp to determine the correct incomingMediaProfile
            callee.incomingMediaProfile = getMediaProfile(sdpOffer, id);
            log(callee.ext, "Detected Incoming Media Profile: " + callee.incomingMediaProfile);            

			// Create the two Endpoints.
            createMediaElements(id, pipeline, function(error, rtpEndpoint, recorderEndpoint, playerEndpoint, playerEndpointRec) {
                if (error) {
                    return callback(error);
                }

				//Register to the User object.
				callee.rtpEndpoint = rtpEndpoint;
				callee.recorderEndpoint = recorderEndpoint;

                //Set Event Handlers for RtpEndpoint
                rtpEvents(callee, rtpEndpoint, recorderEndpoint, playerEndpoint);
                //Set Event Handlers for RecorderEndpoint
                recorderEvents(callee, recorderEndpoint, rtpEndpoint, playerEndpoint, playerEndpointRec);
				
				// Call the function to connect the media components (endpoints).
                connectMediaElements(rtpEndpoint, recorderEndpoint, playerEndpoint, function(error) {
                    if (error) {
                        pipeline.release();
                        return callback(error);
                    }
			
            		// Now the endpoints are connected!
					debuglog("Offer SDP:\n" + sdpOffer);
					//Modify the incoming SDP (configuration.js)
					sdpOffer = config.modifySdpIncoming(sdpOffer);
					
					//The user doesn't actually use this again, but it can't hurt to store it.
					callee.sdpOffer = sdpOffer;

					//Use the RtpEndpoint to process the recieved offer, configuring the endpoint in the process.
					rtpEndpoint.processOffer(sdpOffer, function(error, sdpAnswer) {
						if (error) {
							debuglog('CIC error 5');
                            pipeline.release();
                            return callback(error);
                        }
							
						// Modify the outgoing SDP (configuration.js)
						callee.sdpAnswer = config.modifySdpOutgoing(sdpOffer, sdpAnswer);
						debuglog('Answer SDP:\n' + callee.sdpAnswer);
                        
						return callback(null, sdpAnswer);
                        //All done setting up!  Now we go back up to line 157.
                   	});
                });
            });
        });
    });
}

/*
 * Define the Event Handlers for the RTP Events.  Most just print out
 * info for debugging, but some trigger important things.
 *
 * |    Important Event     | Required Value |                    Effect                     |
 * |------------------------|----------------|-----------------------------------------------|
 * | ConnectionStateChanged | none           | Begin video playback from firstPlayerEndpoint |
 * | MediaStateChanged      | CONNECTED      | Begin recording with recorderEndpoint         |
 */
function rtpEvents(callee, rtpEndpoint, recorderEndpoint, firstPlayerEndpoint) {
    rtpEndpoint.on('Error', (error) => {
        debuglog("RTPEndpoint: Error: " + error);
    });

    rtpEndpoint.on('ConnectionStateChanged', () => {
        debuglog("RTPEndpoint: Connection state changed!");

        log(callee.ext, "Starting playback!");
        startPlayerEndpoint(firstPlayerEndpoint);
    });

    rtpEndpoint.on('MediaStateChanged', (state) => {
        debuglog("***********************\nRTPEndpoint: Media state changed FROM " + state.oldState + " TO " + state.newState + " \n***********************");
        if (state.oldState !== state.newState && state.newState == "CONNECTED") {
            debugger;
            recorderEndpoint.record().then(() => {
                log(callee.ext, "Starting recorder...");
            }); 
        }
    });
    
    rtpEndpoint.on('MediaFlowInStateChange', (param) => {
        debuglog('RTPEndpoint: MediaFlowInStateChange: ' + param.state);
    });

    rtpEndpoint.on('MediaSessionStarted', () => {
        debuglog("RTPEndpoint: Media session started!");
    });

    rtpEndpoint.on('MediaSessionTerminated', () => {
        debuglog("RTPEndpoint: Media session termintated!");
    });
}

/*
 * Define the Event Handlers for the Recorder's Events.  Most just print out
 * info for debugging.
 *
 * The 'Recording' event is when VIDEO starts being recorded!
 *      As a result, it triggers the switch from the currentPlayerEndpoint 
 *      to the playerEndpointDuringRec, to visually inform 
 *      the user that they are being recorded.
 */
function recorderEvents(callee, recorderEndpoint, rtpEndpoint, currentPlayerEndpoint, playerEndpointDuringRec) {
    recorderEndpoint.on('Recording', () => {
        log(callee.ext, "Recorder started successfully.  Recording now.");
        currentPlayerEndpoint.stop();
        switchPlayers(rtpEndpoint, currentPlayerEndpoint, playerEndpointDuringRec, () => {
            startPlayerEndpoint(playerEndpointDuringRec)
        });
    });

    recorderEndpoint.on('Paused', () => {
        debuglog("Recorder: PAUSED");
    });

    recorderEndpoint.on('Stopped', () => {
        debuglog("Recorder: STOPPED IN MY PROVERBIAL TRACKS");
    });

    recorderEndpoint.on('UriEndpointStateChanged', (state) => {
        debuglog("Recorder: STATE changed to:");
        debuglog(state);
    });

    recorderEndpoint.on('MediaFlowInStateChange', (param) => {
        debuglog('Recorder: MediaFlowInStateChange to ' + param.state);
    });
}

/*
 * Stop everything and exit the program in a somewhat graceful fashion.
 *
 * If exitCode < 0, this was an erroneous exit.
 */
function stopAndExit(exitCode, id, reason) {
	if (exitCode < 0) 
	{
		log("Fatal error CODE " + exitCode, reason);
	}
    var callee = userRegistry.getById(id);  
    if (callee) {
        //Hangup
        log(callee.ext, 'Ending calls...');
        callee.ua.terminateSessions();
        //Unregister
        log(callee.ext, 'Unregistering SIP UA...');
        callee.ua.unregister();	
        //Close pipeline if existent
        if (callee.pipeline) {
            var pipeline = callee.pipeline;
            log(callee.ext, 'Releasing pipeline...');
            pipeline.release();
        }
    }   
	log('stopAndExit()', 'Exiting...');
	process.exit();
}

// *************************** HELPER METHODS LIVE HERE *********************************

/*
 * Generate a key to use for SRTP 
 */
function generateKey(size = 30) {
    const key = [];
    const buffer = crypto.randomBytes(size);
    buffer.forEach(b => {
    key.push(String.fromCharCode(b % 96 + 32));
    });
    return key.join('');
}

/*
 * Connect to the kurentoClient, which is essentially an interpreter that
 * allows us to talk to the Kurento Media Server (KMS).  As such, it can
 * really just be thought of as getting the KMS.  
 *
 * IMPORTANT: If this hangs forever, it's a KMS connection issue.  Is KMS offline?
 */ 
function getKurentoClient(callback) {
    if (kurentoClient !== null) {
        return callback(null, kurentoClient);
    }

	debuglog("Going to Kurento....");
    Kurento(config.kurentoServer, function(error, _kurentoClient) {
        if (error) {
            var message = 'Could not find media server at address ' + config.kurentoServer;
            return callback(message + ". Exiting with error " + error);
        }
		debuglog("Established connection to KMS.");
        kurentoClient = _kurentoClient;
        callback(null, kurentoClient);
    });
}

/*
 * Create the Endpoints that form the ends of our simple pipeline.
 * Pipeline:
 *      playerEndpointIntro -> rtpEndpoint -> recorderEndpoint
 *      playerEndpointRec (not connected, used later)
 */
function createMediaElements(id, pipeline, callback) {
    var callee = userRegistry.getById(id);
    
    //First, create the RtpEndpoint.
	rtpParams = {
        mediaPipeline: pipeline,
        crypto: new SDES({
            crypto: "AES_128_CM_HMAC_SHA1_80",
            key: generateKey()
        })        
    }
    pipeline.create('RtpEndpoint', rtpParams, function(error, rtpEndpoint) {
        if (error) {
			debuglog('createMediaElements 1');
            return callback(error);
        }
       
		//Parameters for the RecorderEndpoint 
		var date = getTimestampString();
        recordParams = {
        	mediaPipeline: pipeline,
			mediaProfile: callee.incomingMediaProfile, 
            stopOnEndOfStream: true,
            uri: config.path + 'recordings/rec_' + date + '.' + config.recordingFormat
        };
		
		//RecorderEndpoint creation
        pipeline.create('RecorderEndpoint', recordParams, function(error, recorderEndpoint) {
            if (error) {
				debuglog('createMediaElements 2');
                return callback(error);
            }

        	log(callee.ext, 'Recording to: ' + recordParams.uri);
			
			playerParams = {
				mediaPipeline: pipeline, 
				uri: config.path + 'media/' + config.playFileIntro,
				useEncodedMedia: config.playEncodedMedia 
			}
			//PlayerEndpoint creation
			pipeline.create('PlayerEndpoint', playerParams, function(error, playerEndpointIntro) { 
            	if (error ) { 
					debuglog('createMediaElements 2');
                	return callback(error);
            	}
			    log(callee.ext, "Playing from: " + playerParams.uri);
	            
		
                playerParamsRec = {
                    mediaPipeline: pipeline, 
                    uri: config.path + 'media/' + config.playFileRec,
                    useEncodedMedia: config.playEncodedMedia 
                }
                pipeline.create('PlayerEndpoint', playerParamsRec, function(error, playerEndpointRec) { 
                    if (error ) { 
                        debuglog('createMediaElements 2');
                        return callback(error);
                    }
                    log(callee.ext, "Playing from: " + playerParamsRec.uri);
                    
                    return callback(null, rtpEndpoint, recorderEndpoint, playerEndpointIntro, playerEndpointRec);
                });
			});
		});
    });
}


/*
 * Connect the three endpoints!
 * Pipeline Structure:
 *      PlayerEndpoint -> RtpEndpoint -> RecorderEndpoint
 */
function connectMediaElements(rtpEndpoint, recorderEndpoint, playerEndpoint, callback) {
	rtpEndpoint.connect(recorderEndpoint, 'AUDIO');
    rtpEndpoint.connect(recorderEndpoint, 'VIDEO');
	playerEndpoint.connect(rtpEndpoint, 'AUDIO');
	playerEndpoint.connect(rtpEndpoint, 'VIDEO');
    return callback(null);
}

/*
 * Start playback for a given playerEndpoint.
 * Prints out any erors that occur, and debugging info.
 */
function startPlayerEndpoint(playerEndpoint) {
    playerEndpoint.play(function(error) {
        if (error) {
            debuglog("PlayerEndpoint play() Error: " + error);
        }
        else {
            playerEndpoint.getVideoInfo( function(error, result) {
                if (error) {
                    debuglog("PlayerEndpoint getVideoInfo() Error: " + error);
                }
                else {
                    debuglog("PlayerEndpoint getVideoInfo():")
                    debuglog(result);
                }
            }); 
        }
    });
}

/*
 * Switch from one playerEndpoint to another, changing the RtpEndpoint's source media.
 */
function switchPlayers(rtpEndpoint, oldPlayerEndpoint, newPlayerEndpoint, callback) {
    oldPlayerEndpoint.disconnect(rtpEndpoint, 'AUDIO');
    oldPlayerEndpoint.disconnect(rtpEndpoint, 'VIDEO');
    newPlayerEndpoint.connect(rtpEndpoint, 'AUDIO');
    newPlayerEndpoint.connect(rtpEndpoint, 'VIDEO');
    return callback(null);
}

/*
 * Determine the correct Media Profile based on the SDP.
 * Behavior:
 * | Inbound Video Codec | Inbound Audio Codec |             RESULT             |
 * |---------------------|---------------------|--------------------------------|
 * | H264                | ANY                 | MP4                            |
 * | H264                | NONE                | MP4_VIDEO_ONLY                 |
 * | VP8                 | ANY                 | WEBM                           |
 * | VP8                 | NONE                | WEBM_AUDIO_ONLY                |
 * | NONE                | ANY                 | ERROR: NO VIDEO CODEC          |
 * | OTHER               | ANY                 | ERROR: UNSUPPORTED VIDEO CODEC |
 */
function getMediaProfile(sdpOffer, id) {
    var sdp = SDP.parse(sdpOffer);
    var videoCodec = undefined;
    var audioCodec = undefined;

    //There should only be one of each 'audio' and 'video', 
    //but this loop allows us to be agnostic about the ordering of the sections.
    for (var media of sdp.media) {
        if (media.type == 'audio') {
            audioCodec = chooseCodec(media.rtp, config.allowableAudioCodecs);
        }
        if (media.type == 'video') {
            videoCodec = chooseCodec(media.rtp, config.allowableVideoCodecs);
        }
    }  
    
    //Did we find an acceptable video codec? If not, we have to exit.
    if (!videoCodec) {
        stopAndExit(-6, id, "No acceptable video Codec found in offer:\n" + sdpOffer);
    }

    //Did we find an acceptable audio codec?
    if (!audioCodec) { //Only video
        debuglog("WARNING: No acceptable Audio Codec found. (configuration.js:allowableAudioCodecs)");
        debuglog("Detected Video Codec: " + videoCodec);
        if (videoCodec == "VP8") {
            return 'WEBM_VIDEO_ONLY';
        }
        else if (videoCodec == "H264") {
            return 'MP4_VIDEO_ONLY';
        }
    }
    else { //Both audio and video
        debuglog("Detected Incoming Codecs: " + audioCodec + " and " + videoCodec); 
        if (videoCodec == "VP8") {
            return 'WEBM';
        }
        else if (videoCodec == "H264") {
            return 'MP4';
        }
    }
    //No acceptable video codec :(
    stopAndExit(-7, id, "No acceptable Video Codec found. (configuration.js:allowableVideoCodecs) Offer:\n" + sdpOffer);
}

/*
 * Picks the first allowable codec found in the offers.
 * If none found, returns undefined.
 */
function chooseCodec(codecOffers, acceptableCodecs) {
    //If no codecs were offered, there's nothing to match :)
    if (!codecOffers || !acceptableCodecs) {
        return undefined;
    }

    //Check each codec in offer and return first match
    for (var offer of codecOffers) {
        var codec = offer.codec;
        var result = acceptableCodecs.find(value => value == codec);
        if (result) { //Alowable codec!
            return result;
        }
    }

    //No allowable codecs were found!
    return undefined; 
}


//Timestamp based on system clock.  Returns a string in the format "YYYYMMDD_HHMMSS"
function getTimestampString() {
    var time = new Date();
    var year = time.getFullYear();
    var month = time.getMonth() + 1; //Incremented because of indexing at 0
    var day = time.getDate();
    var hours = time.getHours() + 1; //Incremented because of indexing at 0
    var minutes = time.getMinutes();
    var seconds = time.getSeconds();
	return `${year}${month<10?'0':''}${month}${day<10?'0':''}${day}_${hours<10?'0':''}${hours}${minutes<10?'0':''}${minutes}${seconds<10?'0':''}${seconds}`;
}


//Timestamp based on system clock.  Returns a string in the format "[YYYY-MM-DD HH:MM:SS]"
function getTimeForLog() {
    var time = new Date();
    var year = time.getFullYear();
    var month = time.getMonth() + 1; //Incremented because of indexing at 0
    var day = time.getDate();
    var hours = time.getHours() + 1; //Incremented because of indexing at 0
    var minutes = time.getMinutes();
    var seconds = time.getSeconds();
	return `[${year}-${month<10?'0':''}${month}-${day<10?'0':''}${day} ${hours<10?'0':''}${hours}:${minutes<10?'0':''}${minutes}:${seconds<10?'0':''}${seconds}]`;
}

//Only log message if debug variable is set
function debuglog(message) {
	if (config.debug > 0) {
		log(null, message);
	}
}

//Log a message, with timestamp
function log(ext, message) {
	if (config.logToFile) {
		console.log("FILE LOGGING NOT IMPLEMENTED YET");
		if (ext) {
			console.log(getTimeForLog(), '|', ext, '|', message);
		}
		else {
			console.log(message);	
		}
	}
	else {
		if (ext) {
			console.log(getTimeForLog(), '|', ext, '|', message);
		}
		else {
			console.log(message);	
		}
	}
}