#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace data_management;
string datasetFileName = "../data/batch/logitboost_train.csv";
DataBlock sentDataStream;
DataBlock uncompressedDataBlock;
DataBlock compressedDataBlock;
DataBlock receivedDataStream;
queue<DataBlock> sendReceiveQueue;
const size_t maxDataBlockSize = 16384;
bool getUncompressedDataBlock(DataBlock &block);
void sendCompressedDataBlock(DataBlock &block);
bool receiveCompressedDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
prepareMemory();
Compressor<zlib> compressor;
while(getUncompressedDataBlock(uncompressedDataBlock))
{
compressor.setInputDataBlock(uncompressedDataBlock);
do
{
compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);
compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());
sendCompressedDataBlock(compressedDataBlock);
}
while (compressor.isOutputDataBlockFull());
}
Decompressor<zlib> decompressor;
while(receiveCompressedDataBlock(compressedDataBlock))
{
decompressor.setInputDataBlock(compressedDataBlock);
decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());
receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize());
}
printCRC32();
releaseMemory();
return 0;
}
void prepareMemory()
{
byte *data;
sentDataStream.setSize(readTextFile(datasetFileName, &data));
sentDataStream.setPtr(data);
byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize);
checkAllocation(compressedData);
compressedDataBlock.setPtr(compressedData);
compressedDataBlock.setSize(maxDataBlockSize);
byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
checkAllocation(receivedData);
receivedDataStream.setPtr(receivedData);
}
bool getUncompressedDataBlock(DataBlock &block)
{
static size_t availableDataSize = sentDataStream.getSize();
if(availableDataSize >= maxDataBlockSize)
{
block.setSize(maxDataBlockSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize -= maxDataBlockSize;
}
else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
{
block.setSize(availableDataSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize = 0;
}
else
{
return false;
}
return true;
}
void sendCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize());
checkAllocation(currentPtr);
currentBlock.setPtr(currentPtr);
currentBlock.setSize(block.getSize());
copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());
sendReceiveQueue.push(currentBlock);
return;
}
bool receiveCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
if(sendReceiveQueue.empty())
{
return false;
}
currentBlock = sendReceiveQueue.front();
block.setSize(currentBlock.getSize());
copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());
daal::services::daal_free(currentBlock.getPtr());
sendReceiveQueue.pop();
return true;
}
void printCRC32()
{
unsigned int crcSentDataStream = 0;
unsigned int crcReceivedDataStream = 0;
crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());
cout << endl << "Compression example program results:" << endl << endl;
cout << "Input data checksum: 0x" << hex << crcSentDataStream << endl;
cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;
if (sentDataStream.getSize() != receivedDataStream.getSize())
{
cout << "ERROR: Received data size mismatches with the sent data size" << endl;
}
else if (crcSentDataStream != crcReceivedDataStream)
{
cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
}
else
{
cout << "OK: Received data CRC matches with the sent data CRC" << endl;
}
}
void releaseMemory()
{
if(compressedDataBlock.getPtr())
{
daal::services::daal_free(compressedDataBlock.getPtr());
}
if(receivedDataStream.getPtr())
{
daal::services::daal_free(receivedDataStream.getPtr());
}
if(sentDataStream.getPtr())
{
delete [] sentDataStream.getPtr();
}
}