Java* API Reference for Intel® Data Analytics Acceleration Library 2016 Update 4

CompressionOnline.java

/* file: CompressionOnline.java */
/*******************************************************************************
* Copyright 2014-2016 Intel Corporation All Rights Reserved.
*
* The source code, information and material ("Material") contained herein is
* owned by Intel Corporation or its suppliers or licensors, and title to such
* Material remains with Intel Corporation or its suppliers or licensors. The
* Material contains proprietary information of Intel or its suppliers and
* licensors. The Material is protected by worldwide copyright laws and treaty
* provisions. No part of the Material may be used, copied, reproduced,
* modified, published, uploaded, posted, transmitted, distributed or disclosed
* in any way without Intel's prior express written permission. No license under
* any patent, copyright or other intellectual property rights in the Material
* is granted to or conferred upon you, either expressly, by implication,
* inducement, estoppel or otherwise. Any license under such intellectual
* property rights must be express and approved by Intel in writing.
*
* Unless otherwise agreed by Intel in writing, you may not remove or alter this
* notice or any other notice embedded in Materials by Intel or Intel's
* suppliers or licensors in any way.
*******************************************************************************/
/*
// Content:
// Java example of compression in the online processing mode
*/
package com.intel.daal.examples.compression;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import com.intel.daal.data_management.compression.*;
import com.intel.daal.data_management.compression.zlib.*;
import com.intel.daal.services.DaalContext;
class CompressionOnline {
static Queue<byte[]> sendReceiveQueue = new LinkedList<byte[]>(); /* Queue for sending and receiving compressed data blocks */
private static final long maxDataBlockSize = 16384; /* Maximum size of a data block */
private static final long userDefinedBlockSize = 7000; /* Size for read data from a decompression stream */
static long receivedSize = 0; /* Actual data size in receivedDataStream */
static long availableDataSize = 0; /* Data size not processed in sentDataStream */
private static final String dataset = "../data/online/logitboost_train.csv";
private static byte[] sentDataStream; /* Data stream to compress and send */
private static byte[] uncompressedDataBlock; /* Current block of the data stream to compress */
private static byte[] compressedDataBlock; /* Current compressed block of data */
private static byte[] readDataBlock; /* Current data block to store decompressed data */
private static byte[] receivedDataStream; /* Received uncompressed data stream */
private static DaalContext context = new DaalContext();
public static void main(String[] args) throws java.io.FileNotFoundException, java.io.IOException {
/* Read data from a file and allocate memory */
prepareMemory();
/* Create a compressor */
ZlibCompressor compressor = new ZlibCompressor(context);
compressor.parameter.setCompressionLevel(CompressionLevel.DefaultLevel);
compressor.parameter.setGzHeader(true);
/* Create a stream for compression */
CompressionStream compressionStream = new CompressionStream(context, compressor);
/* Receive the next data block for compression */
while ((uncompressedDataBlock = getDataBlock()) != null) {
/* Put a data block to compressionStream and compress if needed */
compressionStream.add(uncompressedDataBlock);
/* Get the size of the compressed data */
compressedDataBlock = new byte[(int) compressionStream.getCompressedDataSize()];
/* Store the compressed data in compressedDataBlock*/
compressionStream.copyCompressedArray(compressedDataBlock);
/* Send the current compressed block */
sendDataBlock(compressedDataBlock, compressedDataBlock.length);
}
/* Create a decompressor */
ZlibDecompressor decompressor = new ZlibDecompressor(context);
decompressor.parameter.setGzHeader(true);
/* Create a stream for decompression */
DecompressionStream decompressionStream = new DecompressionStream(context, decompressor);
/* Actual size of the decompressed data currently read from decompressionStream */
long readSize = 0;
/* Receive compressed data by blocks */
while ((compressedDataBlock = receiveDataBlock()) != null) {
/* Write the received block to decompressionStream */
decompressionStream.add(compressedDataBlock);
/* Asynchronous read from decompressionStream */
do {
/* Read userDefinedBlockSize bytes from decompressionStream to readDataBlock */
readSize = decompressionStream.copyDecompressedArray(readDataBlock);
/* Update the actual data size in receivedDataStream */
System.arraycopy(readDataBlock, 0, receivedDataStream, (int) receivedSize, (int) readSize);
/* Update the actual data size in receivedDataStream */
receivedSize += readSize;
} while (readSize != 0);
}
/* Compute and print checksums for sentDataStream and receivedDataStream */
printCRC32(sentDataStream, receivedDataStream);
context.dispose();
}
private static void prepareMemory() throws java.io.FileNotFoundException, java.io.IOException {
/* Read data from a file */
sentDataStream = readData();
receivedDataStream = new byte[sentDataStream.length];
/* Set the size of the data not processed in sentDataStream */
availableDataSize = sentDataStream.length;
readDataBlock = new byte[(int) userDefinedBlockSize];
}
private static byte[] readData() throws java.io.FileNotFoundException, java.io.IOException {
/* Read an input file */
RandomAccessFile file = new RandomAccessFile(dataset, "r");
int dataLength = (int) file.length();
byte[] sentData = new byte[dataLength];
file.read(sentData);
file.close();
return sentData;
}
private static byte[] getDataBlock() {
long startPosition = sentDataStream.length - availableDataSize;
byte[] currentBlock;
/* Copy the current uncompressed block */
if (availableDataSize >= maxDataBlockSize) {
currentBlock = Arrays.copyOfRange(sentDataStream, (int) startPosition,
(int) (startPosition + maxDataBlockSize));
availableDataSize -= maxDataBlockSize;
} else if ((availableDataSize < maxDataBlockSize) && (availableDataSize > 0)) {
currentBlock = Arrays.copyOfRange(sentDataStream, (int) startPosition, sentDataStream.length);
availableDataSize = 0;
} else {
return null;
}
return currentBlock;
}
private static void sendDataBlock(byte[] block, long size) {
/* Copy an incoming block to the current compressed block */
byte[] currentBlock = Arrays.copyOf(block, (int) size);
/* Push the current compressed block to the queue */
sendReceiveQueue.add(currentBlock);
}
private static byte[] receiveDataBlock() {
byte[] currentBlock;
/* Receive the current compressed block from the queue or stop at the end */
if ((currentBlock = sendReceiveQueue.poll()) == null) {
return null;
}
return currentBlock;
}
private static void printCRC32(byte[] sentData, byte[] receivedData) {
/* Compute checksums for full input data and full received data */
Checksum crcSentDataStream = new CRC32();
crcSentDataStream.update(sentData, 0, sentData.length);
Checksum crcReceivedDataStream = new CRC32();
crcReceivedDataStream.update(receivedData, 0, (int) receivedSize);
System.out.println("Compression example program results:");
System.out.println("Input data checksum: 0x" + Integer.toHexString((int) crcSentDataStream.getValue()));
System.out.println("Received data checksum: 0x" + Integer.toHexString((int) crcReceivedDataStream.getValue()));
if (sentData.length != receivedSize) {
System.out.println("ERROR: Received data size (" + receivedSize + ") mismatches with the sent data size ("
+ sentData.length + ")");
} else if (crcSentDataStream.getValue() != crcReceivedDataStream.getValue()) {
System.out.println("ERROR: Received data CRC mismatches with the sent data CRC");
} else {
System.out.println("OK: Received data CRC matches with the sent data CRC");
}
}
}