Open main menu

CDOT Wiki β

SLEEPy

Intel Data Analytics Acceleration Library (DAAL)

Team Member

  1. Luong Chuong
  2. Jacky Siu
  3. Woodson Delhia

Introduction

What is Big Data

Big Data is data that is so big that traditional methods of data processing fail to keep up. A lot of the time this type of data is related to human behavior measurable by computers. This maybe trends, web analytics etc. With so many people using technology, data gathered this way is becoming massive. This is one of the reasons for the rise of Big Data.

What is DAAL

DAAL is a C++ & Java / Scala library for data analytics. It's similar to MKL(Math Kernel Library) with some differences:

  • MKL focuses on computation. DAAL focuses on the entire data flow (aquisition, transformation, processing).
  • Optimized for all kinds of Intel based devices (from data center to home computers)
  • Vectorization method used for optimization

DAAL supports 3 processing modes

  • Offline Processing (Batch) - Data can fit in memory, data can be processed all at once.
  • Online Processing (Streaming) - Data is too big for memory, DAAL processes the data in chunks and combine the partial results for the final result.
  • Distributed processing - Distributes data processing. DAAL has not bound the communication method and leaves it to the developer (Hadoop, Spark, MPI etc).




How DAAL Works





Installing DAAL

DAAL is already bundle with Intel® Parallel Studio XE 2016 Composer, Professional, or Cluster Edition, but you can also purchase a stand-alone version. To install it, you will have to execute the batch file to setup the environment.


By default, the batch to setup the environment is located at the <install dir> at C:\Program files (x86)\IntelSWTools\compilers_and_libraries_2016.x.xxx\windows.


Set Environment Variables

Run the <install dir>\daal\bin\daalvars.bat script as appropriate to your target architecture.

IA-32 architecture:

 daalvars.bat ia32

Intel® 64 architecture:

 daalvars.bat intel64


 


Compiling

Tells the compiler to link with standard threaded Intel DAAL:

 /Qdaal or
 /Qdaal:parallel

Tells the compiler to link with sequential version of Intel DAAL:

 /Qdaal:sequential

Visual Studio:

  1. In Solution Explorer, go to Project > Properties > Configuration Properties > Intel Performance Libraries.
  2. From the Use Intel DAAL drop-down menu, select the appropriate linking method. For example: Multi-threaded Static Library.


NOTE: DAAL may require you to also enable TBB.


 


Library Files

Single-threaded

(non-threaded)

Intel DAAL

Multi-threaded

(internally threaded)

Intel DAAL

Static linking

daal_core.lib

daal_sequential.lib

daal_core.lib

daal_thread.lib

Dynamic linking

daal_core_dll.lib

daal_core_dll.lib

Code Examples

Batch Sorting

CSV Data:

-55.558252,63.051427,-27.793776, -75.622534,61.212279,-16.283311, -86.747071,-28.132241,-17.824316, -34.172101,-51.404172,14.670925, -61.506308,48.248030,-99.235341, 9.746765,-89.879258,55.561778, 48.896723,-32.648097,48.313603, -15.346015,9.769776,-33.483281, 56.726081,-87.272631,8.724224, -1.926802,54.960580,-78.723429, 45.237223,-79.764218,-47.271926, 84.138339,11.547818,-92.962952, 46.711824,-42.623510,-34.664673, 55.813112,19.803475,4.807766, -55.474098,-72.163755,89.425736, -7.566596,-77.829218,58.630172, -76.081937,-12.089445,-44.065054, -25.888944,46.425499,-37.515164, -30.201387,-16.237217,-50.716591, -88.085869,60.136249,54.812866

Code:

/* file: sorting_batch.cpp
* Copyright 2014-2016 Intel Corporation All Rights Reserved.*/

#include "daal.h"
#include "service.h"

using namespace daal;
using namespace daal::algorithms;
using namespace daal::data_management;
using namespace std;

/* Input data set parameters */
string datasetFileName = "../data/batch/sorting.csv";

int main(int argc, char *argv[])
{
    checkArguments(argc, argv, 1, &datasetFileName);

    /* Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file */
    FileDataSource<CSVFeatureManager> dataSource(datasetFileName, DataSource::doAllocateNumericTable, DataSource::doDictionaryFromContext);

    /* Retrieve the data from the input file */
    dataSource.loadDataBlock();

    /* Create algorithm objects to sort data using the default (radix) method */
    sorting::Batch<> algorithm;

    /* Print the input observations matrix */
    printNumericTable(dataSource.getNumericTable(), "Initial matrix of observations:");

    /* Set input objects for the algorithm */
    algorithm.input.set(sorting::data, dataSource.getNumericTable());

    /* Sort data observations */
    algorithm.compute();

    /* Get the sorting result */
    services::SharedPtr<sorting::Result> res = algorithm.getResult();

    printNumericTable(res->get(sorting::sortedData), "Sorted matrix of observations:");

    return 0;
}

Results:

 

The data is sorted from smallest to largest per column.

Data Blocks:

dataSource.loadDataBlock(5);
//dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5);
   

"Blocks" of data are being loaded 5 rows at a time. This allows us to easily section off data to process. This is also one way of distributing data to MPI etc.


Data Matrix Structure

Code:

/* file: datastructures_soa.cpp */
/*******************************************************************************
!  Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.


#include "daal.h"
#include "service.h"

using namespace daal;

const char *toString(DataFeatureUtils::FeatureType v);
const char *toString(DataFeatureUtils::InternalNumType v);

int main()
{
    std::cout << "Structure of array (SOA) numeric table example" << std::endl << std::endl;

    const size_t firstReadRow = 0;
    const size_t nRead = 4;
    size_t readFeatureIdx;

    /*Example of using an SOA numeric table*/
    const size_t nObservations = 10;
    const size_t nFeatures = 4;
    double dDataSOA[nObservations] = {1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8};
    float  fDataSOA[nObservations] = {3.1f, 3.2f, 3.3f, 3.4f, 3.5f, 3.6f, 3.7f, 3.8f, 3.9f, 4.0f};
    int    iDataSOA[nObservations] = { -10, -20, -30, -40, -50, -60, -70, -80, -90, -100};
    int    cDataSOA[nObservations] = {1, 2, 3, 4, 5, 1, 2, 3, 4, 5};

    /* Construct an SOA numeric table with nObservations rows and nFeatures columns */
    SOANumericTable dataTable(nFeatures, nObservations);
    dataTable.setArray<int>   (cDataSOA, 0);
    dataTable.setArray<float> (fDataSOA, 1);
    dataTable.setArray<double>(dDataSOA, 2);
    dataTable.setArray<int>   (iDataSOA, 3);

    /* Read a block of rows */
    BlockDescriptor<double> doubleBlock;
    dataTable.getBlockOfRows(firstReadRow, nRead, readOnly, doubleBlock);
    printArray<double>(doubleBlock.getBlockPtr(), nFeatures, doubleBlock.getNumberOfRows(), "Print SOA data structures as double:");
    dataTable.releaseBlockOfRows(doubleBlock);

    /* Read a feature (column) and write a new value into it */
    readFeatureIdx = 0;
    BlockDescriptor<int> intBlock;
    dataTable.getBlockOfColumnValues(readFeatureIdx, firstReadRow, nObservations, readOnly, intBlock);
    printArray<int>(intBlock.getBlockPtr(), 1, intBlock.getNumberOfRows(), "Print the first feature of SOA:");
    dataTable.releaseBlockOfColumnValues(intBlock);

    /* Get the dictionary and the number of features */
    NumericTableDictionary *pDictionary = dataTable.getDictionary();
    std::cout << "Number of features in table: " << pDictionary->getNumberOfFeatures() << std::endl;
    std::cout << std::endl;


	system("pause");
    return 0;
}

 


Compressor

Code:

/* file: compressor.cpp */
/*******************************************************************************
!  Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.

#include "daal.h"
#include "service.h"

using namespace std;
using namespace daal;
using namespace data_management;

string datasetFileName  = "../data/batch/logitboost_train.csv";

DataBlock sentDataStream;        /* Data stream to compress and send */
DataBlock uncompressedDataBlock; /* Current block of data stream to compress */
DataBlock compressedDataBlock;   /* Current compressed block of data */
DataBlock receivedDataStream;    /* Received uncompressed data stream */

queue<DataBlock> sendReceiveQueue; /* Queue for sending and receiving compressed data blocks */

const size_t maxDataBlockSize = 16384; /* Maximum size of a data block */

bool getUncompressedDataBlock(DataBlock &block);
void sendCompressedDataBlock(DataBlock &block);
bool receiveCompressedDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();

int main(int argc, char *argv[])
{
    checkArguments(argc, argv, 1, &datasetFileName);

    /* Read data from a file and allocate memory */
    prepareMemory();

    /* Create a compressor */
    Compressor<zlib> compressor;

    /* Receive the next data block for compression */
    while(getUncompressedDataBlock(uncompressedDataBlock))
    {
        /* Associate data to compress with the compressor */
        compressor.setInputDataBlock(uncompressedDataBlock);

        /* Memory for a compressed block might not be enough to compress the input block at once */
        do
        {
            /* Compress uncompressedDataBlock to compressedDataBlock */
            compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);

            /* Get the actual size of a compressed block */
            compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());

            /* Send the current compressed block */
            sendCompressedDataBlock(compressedDataBlock);
        }
        /* Check if an additional data block is needed to complete compression */
        while (compressor.isOutputDataBlockFull());
    }

    /* Create a decompressor */
    Decompressor<zlib> decompressor;

    /* Receive compressed data by blocks */
    while(receiveCompressedDataBlock(compressedDataBlock))
    {
        /* Associate compressed data with the decompressor */
        decompressor.setInputDataBlock(compressedDataBlock);

        /* Decompress an incoming block to the end of receivedDataStream */
        decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());

        /* Update the size of actual data in receivedDataStream */
        receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize());
    }

    /* Compute and print checksums for sentDataStream and receivedDataStream */
    printCRC32();

    releaseMemory();

    return 0;
}

void prepareMemory()
{
    /* Allocate sentDataStream and read an input file */
    byte *data;
    sentDataStream.setSize(readTextFile(datasetFileName, &data));
    sentDataStream.setPtr(data);

    byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize);
    checkAllocation(compressedData);
    compressedDataBlock.setPtr(compressedData);
    compressedDataBlock.setSize(maxDataBlockSize);

    byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
    checkAllocation(receivedData);
    receivedDataStream.setPtr(receivedData);
}

bool getUncompressedDataBlock(DataBlock &block)
{
    static size_t availableDataSize = sentDataStream.getSize();

    /* Calculate the current block size and ptr */
    if(availableDataSize >= maxDataBlockSize)
    {
        block.setSize(maxDataBlockSize);
        block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
        availableDataSize -= maxDataBlockSize;
    }
    else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
    {
        block.setSize(availableDataSize);
        block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
        availableDataSize = 0;
    }
    else
    {
        return false;
    }

    return true;
}

void sendCompressedDataBlock(DataBlock &block)
{
    DataBlock currentBlock;

    /* Allocate memory for the current compressed block in the queue */
    byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize());
    checkAllocation(currentPtr);
    currentBlock.setPtr(currentPtr);

    /* Copy an incoming block to the current compressed block */
    currentBlock.setSize(block.getSize());
    copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());

    /* Push the current compressed block to the queue */
    sendReceiveQueue.push(currentBlock);

    return;
}

bool receiveCompressedDataBlock(DataBlock &block)
{
    DataBlock currentBlock;

    /* Stop at the end of the queue */
    if(sendReceiveQueue.empty())
    {
        return false;
    }

    /* Receive the current compressed block from the queue */
    currentBlock = sendReceiveQueue.front();
    block.setSize(currentBlock.getSize());
    copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());

    /* Release memory of the current compressed block in the queue */
    daal::services::daal_free(currentBlock.getPtr());

    sendReceiveQueue.pop();

    return true;
}

void printCRC32()
{
    unsigned int crcSentDataStream = 0;
    unsigned int crcReceivedDataStream = 0;

    /* Compute checksums for full input data and full received data */
    crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
    crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());

    cout << endl << "Compression example program results:" << endl << endl;

    cout << "Input data checksum:    0x" << hex << crcSentDataStream << endl;
    cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;

    if (sentDataStream.getSize() != receivedDataStream.getSize())
    {
        cout << "ERROR: Received data size mismatches with the sent data size" << endl;
    }
    else if (crcSentDataStream != crcReceivedDataStream)
    {
        cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
    }
    else
    {
        cout << "OK: Received data CRC matches with the sent data CRC" << endl;
    }
}

void releaseMemory()
{
    if(compressedDataBlock.getPtr())
    {
        daal::services::daal_free(compressedDataBlock.getPtr());
    }
    if(receivedDataStream.getPtr())
    {
        daal::services::daal_free(receivedDataStream.getPtr());
    }
    if(sentDataStream.getPtr())
    {
        delete [] sentDataStream.getPtr();
    }
}

 


Useful Links

  1. https://software.intel.com/en-us/daal
  2. Sorting: https://software.intel.com/en-us/node/610127
  3. Data Structure: https://software.intel.com/en-us/node/599565
  4. Compressor: https://software.intel.com/en-us/node/599552
  5. DAAL Guide Book: https://software.intel.com/en-us/daal-programming-guide