#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace data_management;
string datasetFileName = "../data/online/logitboost_train.csv";
DataBlock sentDataStream;
DataBlock uncompressedDataBlock;
DataBlock compressedDataBlock;
DataBlock receivedDataStream;
queue<DataBlock> sendReceiveQueue;
const size_t maxDataBlockSize = 16384;
const size_t userDefinedBlockSize = 7000;
bool getDataBlock(DataBlock &block);
void sendDataBlock(DataBlock *block);
bool receiveDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
prepareMemory();
Compressor<zlib> compressor;
compressor.parameter.gzHeader = true;
compressor.parameter.level = level9;
CompressionStream compressionStream(&compressor);
while(getDataBlock(uncompressedDataBlock))
{
compressionStream << uncompressedDataBlock;
services::SharedPtr<DataBlockCollection> compressedBlocks = compressionStream.getCompressedBlocksCollection();
for(size_t i = 0; i < compressedBlocks->size(); i++)
{
sendDataBlock((*compressedBlocks)[i].get());
}
}
Decompressor<zlib> decompressor;
decompressor.parameter.gzHeader = true;
DecompressionStream decompressionStream(&decompressor);
size_t readSize = 0;
while (receiveDataBlock(compressedDataBlock))
{
decompressionStream << compressedDataBlock;
do
{
readSize = decompressionStream.copyDecompressedArray(receivedDataStream.getPtr() + receivedDataStream.getSize(),
userDefinedBlockSize);
receivedDataStream.setSize(receivedDataStream.getSize() + readSize);
}
while (readSize != 0);
}
printCRC32();
releaseMemory();
return 0;
}
void prepareMemory()
{
byte *data;
sentDataStream.setSize(readTextFile(datasetFileName, &data));
sentDataStream.setPtr(data);
byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
checkAllocation(receivedData);
receivedDataStream.setPtr(receivedData);
}
bool getDataBlock(DataBlock &block)
{
static size_t availableDataSize = sentDataStream.getSize();
if(availableDataSize >= maxDataBlockSize)
{
block.setSize(maxDataBlockSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize -= maxDataBlockSize;
}
else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
{
block.setSize(availableDataSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize = 0;
}
else
{
return false;
}
return true;
}
void sendDataBlock(DataBlock *block)
{
DataBlock currentBlock;
byte *tmp = (byte *)daal::services::daal_malloc(block->getSize());
checkAllocation(tmp);
currentBlock.setPtr(tmp);
currentBlock.setSize(block->getSize());
copyBytes(currentBlock.getPtr(), block->getPtr(), currentBlock.getSize());
sendReceiveQueue.push(currentBlock);
return;
}
bool receiveDataBlock(DataBlock &block)
{
DataBlock currentBlock;
if(sendReceiveQueue.empty())
{
return false;
}
currentBlock = sendReceiveQueue.front();
block.setSize(currentBlock.getSize());
if(block.getPtr()) { delete[] block.getPtr(); }
block.setPtr(new byte[block.getSize()]);
copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());
daal::services::daal_free(currentBlock.getPtr());
sendReceiveQueue.pop();
return true;
}
void printCRC32()
{
unsigned int crcSentDataStream = 0;
unsigned int crcReceivedDataStream = 0;
crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());
cout << endl << "Compression example program results:" << endl << endl;
cout << "Input data checksum: 0x" << hex << crcSentDataStream << endl;
cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;
if (sentDataStream.getSize() != receivedDataStream.getSize())
{
cout << "ERROR: Received data size mismatches with the sent data size" << endl;
}
else if (crcSentDataStream != crcReceivedDataStream)
{
cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
}
else
{
cout << "OK: Received data CRC matches with the sent data CRC" << endl;
}
}
void releaseMemory()
{
if(compressedDataBlock.getPtr())
{
delete [] compressedDataBlock.getPtr();
}
if(receivedDataStream.getPtr())
{
daal::services::daal_free(receivedDataStream.getPtr());
}
if(sentDataStream.getPtr())
{
delete [] sentDataStream.getPtr();
}
}