Home Reference Source Test

src/main/generic/network/connection/NetworkConnection.js

class NetworkConnection extends Observable {
    /**
     * @param {DataChannel} channel
     * @param {number} protocol
     * @param {NetAddress} netAddress
     * @param {PeerAddress} peerAddress
     */
    constructor(channel, protocol, netAddress, peerAddress) {
        super();
        /** @type {DataChannel} */
        this._channel = channel;

        /** @type {number} */
        this._protocol = protocol;
        /** @type {NetAddress} */
        this._netAddress = netAddress;
        /** @type {PeerAddress} */
        this._peerAddress = peerAddress;

        /** @type {number} */
        this._bytesSent = 0;
        /** @type {number} */
        this._bytesReceived = 0;

        /** @type {boolean} */
        this._inbound = !peerAddress;

        /** @type {boolean} */
        this._closed = false;

        /** @type {*} */
        this._lastError = null;

        // Unique id for this connection.
        /** @type {number} */
        this._id = NetworkConnection._instanceCount++;

        this._channel.on('message', msg => this._onMessage(msg));
        this._channel.on('close', () => this._onClose(CloseType.CLOSED_BY_REMOTE, 'Closed by remote'));
        this._channel.on('error', e => this._onError(e));
    }

    /**
     * @param {Uint8Array} msg
     * @private
     */
    _onMessage(msg) {
        // Don't emit messages if this channel is closed.
        if (this._closed) {
            return;
        }

        this._bytesReceived += msg.byteLength || msg.length;
        this.fire('message', msg, this);
    }

    /**
     * @param {*} e
     * @private
     */
    _onError(e) {
        this._lastError = e;
        this.fire('error', e, this);
    }

    /**
     * @param {number} [type]
     * @param {string} [reason]
     * @private
     */
    _onClose(type, reason) {
        // Don't fire close event again when already closed.
        if (this._closed) {
            return;
        }

        // Mark this connection as closed.
        this._closed = true;

        // Propagate last network error.
        if (type === CloseType.CLOSED_BY_REMOTE && this._lastError) {
            type = CloseType.NETWORK_ERROR;
            reason = this._lastError;
        }

        // Tell listeners that this connection has closed.
        this.fire('close', type, reason, this);
    }

    /**
     * @param {number} [type]
     * @param {string} [reason]
     * @private
     */
    _close(type, reason) {
        if (this._closed) {
            return;
        }

        // Don't wait for the native close event to fire.
        this._onClose(type, reason);

        // Close the native channel.
        this._channel.close();
        this._channel = null;
        this._offAll();
    }

    /**
     * @return {boolean}
     * @private
     */
    _isChannelOpen() {
        return this._channel && this._channel.readyState === DataChannel.ReadyState.OPEN;
    }

    /**
     * @return {boolean}
     * @private
     */
    _isChannelClosing() {
        return this._channel && this._channel.readyState === DataChannel.ReadyState.CLOSING;
    }

    /**
     * @return {boolean}
     * @private
     */
    _isChannelClosed() {
        return !this._channel || this._channel.readyState === DataChannel.ReadyState.CLOSED;
    }

    /**
     * @param {Uint8Array} msg
     * @return {boolean}
     */
    send(msg) {
        const logAddress = this._peerAddress || this._netAddress;
        if (this._closed) {
            return false;
        }

        // Fire close event (early) if channel is closing/closed.
        if (this._isChannelClosing() || this._isChannelClosed()) {
            Log.d(NetworkConnection, () => `Not sending data to ${logAddress} - channel closing/closed (${this._channel.readyState})`);
            this._onClose(CloseType.CHANNEL_CLOSING, 'channel closing');
            return false;
        }

        // Don't attempt to send if channel is not (yet) open.
        if (!this._isChannelOpen()) {
            Log.d(NetworkConnection, () => `Not sending data to ${logAddress} - channel not open (${this._channel.readyState})`);
            return false;
        }

        try {
            this._channel.send(msg);
            this._bytesSent += msg.byteLength || msg.length;
            return true;
        } catch (e) {
            Log.e(NetworkConnection, `Failed to send data to ${logAddress}: ${e.message || e}`);
            return false;
        }
    }

    /**
     * @param {Message.Type|Array.<Message.Type>} types
     * @param {function()} timeoutCallback
     * @param {number} [msgTimeout]
     * @param {number} [chunkTimeout]
     */
    expectMessage(types, timeoutCallback, msgTimeout, chunkTimeout) {
        if (this._closed) {
            return;
        }
        this._channel.expectMessage(types, timeoutCallback, msgTimeout, chunkTimeout);
    }

    /**
     * @param {Message.Type} type
     * @returns {boolean}
     */
    isExpectingMessage(type) {
        if (this._closed) {
            return false;
        }
        return this._channel.isExpectingMessage(type);
    }

    /**
     * @param {Message.Type} type
     * @param {boolean} success
     */
    confirmExpectedMessage(type, success) {
        if (this._closed) {
            return;
        }
        this._channel.confirmExpectedMessage(type, success);
    }

    /**
     * @param {number} [type]
     * @param {string} [reason]
     */
    close(type, reason) {
        if (!this._closed) {
            const connType = this._inbound ? 'inbound' : 'outbound';
            Log.d(NetworkConnection, `Closing ${connType} connection #${this._id} ${this._peerAddress || this._netAddress}` + (reason ? ` - ${reason}` : '') + ` (${type})`);
        }
        this._close(type, reason);
    }

    /**
     * @param {NetworkConnection} o
     * @return {boolean}
     */
    equals(o) {
        return o instanceof NetworkConnection
            && this._id === o.id;
    }

    /**
     * @returns {string}
     */
    hashCode() {
        return this._id.toString();
    }

    /**
     * @return {string}
     */
    toString() {
        return `NetworkConnection{id=${this._id}, protocol=${this._protocol}, peerAddress=${this._peerAddress}, netAddress=${this._netAddress}}`;
    }

    /** @type {number} */
    get id() {
        return this._id;
    }

    /** @type {number} */
    get protocol() {
        return this._protocol;
    }

    /** @type {PeerAddress} */
    get peerAddress() {
        return this._peerAddress;
    }

    /** @type {PeerAddress} */
    set peerAddress(value) {
        this._peerAddress = value;
    }

    /** @type {NetAddress} */
    get netAddress() {
        return this._netAddress;
    }

    /** @type {NetAddress} */
    set netAddress(value) {
        this._netAddress = value;
    }

    /** @type {number} */
    get bytesSent() {
        return this._bytesSent;
    }

    /** @type {number} */
    get bytesReceived() {
        return this._bytesReceived;
    }

    /** @type {boolean} */
    get inbound() {
        return this._inbound;
    }

    /** @type {boolean} */
    get outbound() {
        return !this._inbound;
    }

    /** @type {boolean} */
    get closed() {
        return this._closed;
    }

    /** @type {number} */
    get lastMessageReceivedAt() {
        if (this._closed) {
            return 0;
        }
        return this._channel.lastMessageReceivedAt;
    }
}
// Used to generate unique NetworkConnection ids.
NetworkConnection._instanceCount = 0;
Class.register(NetworkConnection);