Source: connection.js

/**
 * @file connection.js is the file for the Connection class. Connection is used 
 * to interact with a Database instance. 
 */
"use strict";

const dispatcher = require("./dispatcher.js");
const QueryResult = require("./query_result.js");
const PreparedStatement = require("./prepared_statement.js");

class Connection {
  /**
   * Initialize a new Connection object. Note that the initialization is done
   * lazily, so the connection is not initialized until the first query is
   * executed. To initialize the connection immediately, call the `init()`
   * function on the returned object.
   * @param {kuzu.Database} database the database object to connect to.
   * @param {Number} numThreads the maximum number of threads to use for query 
   * execution.
   */
  constructor(database, numThreads = null) {
    this._isInitialized = false;
    this._initPromise = null;
    this._id = null;
    this._isClosed = false;
    this._database = database;
    this.numThreads = numThreads;
  }

  /**
   * Initialize the connection. Calling this function is optional, as the
   * connection is initialized automatically when the first query is executed.
   */
  async init() {
    if (!this._isInitialized) {
      if (!this._initPromise) {
        this._initPromise = (async () => {
          const databaseId = await this._database._getDatabaseObjectId();
          const worker = await dispatcher.getWorker();
          const res = await worker.connectionConstruct(databaseId, this.numThreads);
          if (res.isSuccess) {
            this._id = res.id;
            this._isInitialized = true;
          } else {
            throw new Error(res.error);
          }
        })();
      }
      await this._initPromise;
      this._initPromise = null;
    }
  }

  /**
   * Internal function to get the connection object ID.
   * @private
   * @throws {Error} if the connection is closed.
   */
  async _getConnectionObjectId() {
    if (this._isClosed) {
      throw new Error("Connection is closed.");
    }
    if (!this._isInitialized) {
      await this.init();
    }
    return this._id;
  }

  /**
   * Set the maximum number of threads to use for query execution.
   * @param {Number} numThreads the maximum number of threads to use for query 
   * execution.
   */
  async setMaxNumThreadForExec(numThreads) {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionSetMaxNumThreadForExec(connectionId, numThreads);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
  }

  /**
   * Set the query timeout in milliseconds.
   * @param {Number} timeout the query timeout in milliseconds.
   */
  async setQueryTimeout(timeout) {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionSetQueryTimeout(connectionId, timeout);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
  }

  /**
   * Get the maximum number of threads to use for query execution.
   * @returns {Number} the maximum number of threads to use for query execution.
   */
  async getMaxNumThreadForExec() {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionGetMaxNumThreadForExec(connectionId);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
    return res.numThreads;
  }

  /**
   * Execute a query.
   * @param {String} statement the statement to execute.
   * @returns {kuzu.QueryResult} the query result.
   */
  async query(statement) {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionQuery(connectionId, statement);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
    const queryResult = new QueryResult(res.id);
    await queryResult._syncValues();
    return queryResult;
  }

  /**
   * Prepare a statement for execution.
   * @param {String} statement the statement to prepare.
   * @returns {kuzu.PreparedStatement} the prepared statement.
   */
  async prepare(statement) {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionPrepare(connectionId, statement);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
    const preparedStatement = new PreparedStatement(res.id);
    await preparedStatement._syncValues();
    return preparedStatement;
  }

  /**
   * Execute a prepared statement.
   * @param {kuzu.sync.PreparedStatement} preparedStatement the prepared 
   * statement to execute.
   * @param {Object} params a plain object mapping parameter names to values.
   * @returns {kuzu.QueryResult} the query result.
   */
  async execute(preparedStatement, params = {}) {
    const connectionId = await this._getConnectionObjectId();
    const worker = await dispatcher.getWorker();
    const res = await worker.connectionExecute(connectionId, preparedStatement._id, params);
    if (!res.isSuccess) {
      throw new Error(res.error);
    }
    const queryResult = new QueryResult(res.id);
    await queryResult._syncValues();
    return queryResult;
  }

  /**
   * Close the connection.
   */
  async close() {
    if (!this._isClosed) {
      const connectionId = await this._getConnectionObjectId();
      const worker = await dispatcher.getWorker();
      await worker.connectionClose(connectionId);
      this._isClosed = true;
    }
  }
}

module.exports = Connection;