Home Reference Source Repository

src/notebook/kernel/messaging.js

/* eslint camelcase: 0 */ // <-- Per Jupyter message spec

import * as uuid from 'uuid';

import Rx from 'rxjs/Rx';

const Observable = Rx.Observable;

// TODO: the session ID should likely be in the state tree, not a singleton
export const session = uuid.v4();

export function getUsername() {
  return process.env.LOGNAME || process.env.USER || process.env.LNAME ||
    process.env.USERNAME;
}

export function createMessage(msg_type, fields) {
  const username = getUsername();
  return Object.assign({
    header: {
      username,
      session,
      msg_type,
      msg_id: uuid.v4(),
      date: new Date(),
      version: '5.0',
    },
    metadata: {},
    parent_header: {},
    content: {},
  }, fields);
}

/**
 * childOf filters out messages that don't have the parent header matching parentMessage
 * @param  {Object}  parentMessage Jupyter message protocol message
 * @return {Observable}               the resulting observable
 */
export function childOf(parentMessage) {
  const parentMessageID = parentMessage.header.msg_id;
  return Observable.create(subscriber => {
    // since we're in an arrow function `this` is from the outer scope.
    // save our inner subscription
    const subscription = this.subscribe(msg => {
      if (!msg.parent_header || !msg.parent_header.msg_id) {
        subscriber.error(new Error('no parent_header.msg_id on message'));
        return;
      }

      if (parentMessageID === msg.parent_header.msg_id) {
        subscriber.next(msg);
      }
    },
    // be sure to handle errors and completions as appropriate and
    // send them along
    err => subscriber.error(err),
    () => subscriber.complete());

    // to return now
    return subscription;
  });
}

/**
 * ofMessageType is an Rx Operator that filters on msg.header.msg_type
 * being one of messageTypes
 * @param  {Array} messageTypes e.g. ['stream', 'error']
 * @return {Observable}                 the resulting observable
 */
export function ofMessageType(messageTypes) {
  return Observable.create(subscriber => {
    // since we're in an arrow function `this` is from the outer scope.
    // save our inner subscription
    const subscription = this.subscribe(msg => {
      if (!msg.header || !msg.header.msg_type) {
        subscriber.error(new Error('no header.msg_type on message'));
        return;
      }

      if (messageTypes.indexOf(msg.header.msg_type) !== -1) {
        subscriber.next(msg);
      }
    },
    // be sure to handle errors and completions as appropriate and
    // send them along
    err => subscriber.error(err),
    () => subscriber.complete());

    // to return now
    return subscription;
  });
}

Observable.prototype.childOf = childOf;
Observable.prototype.ofMessageType = ofMessageType;