#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace daal::algorithms;
const size_t nBlocks = 4;
const string datasetFileNames[] =
{
"../data/distributed/svd_1.csv",
"../data/distributed/svd_2.csv",
"../data/distributed/svd_3.csv",
"../data/distributed/svd_4.csv"
};
void computestep1Local(size_t block);
void computeOnMasterNode();
void finalizeComputestep1Local(size_t block);
services::SharedPtr<data_management::DataCollection> dataFromStep1ForStep2[nBlocks];
services::SharedPtr<data_management::DataCollection> dataFromStep1ForStep3[nBlocks];
services::SharedPtr<data_management::DataCollection> dataFromStep2ForStep3[nBlocks];
services::SharedPtr<NumericTable> Sigma;
services::SharedPtr<NumericTable> V ;
services::SharedPtr<NumericTable> Ui[nBlocks];
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 4, &datasetFileNames[0], &datasetFileNames[1], &datasetFileNames[2], &datasetFileNames[3]);
for (size_t i = 0; i < nBlocks; i++)
{
computestep1Local(i);
}
computeOnMasterNode();
for (size_t i = 0; i < nBlocks; i++)
{
finalizeComputestep1Local(i);
}
printNumericTable(Sigma, "Singular values:");
printNumericTable(V, "Right orthogonal matrix V:");
printNumericTable(Ui[0], "Part of left orthogonal matrix U from 1st node:", 10);
return 0;
}
void computestep1Local(size_t block)
{
FileDataSource<CSVFeatureManager> dataSource(datasetFileNames[block], DataSource::doAllocateNumericTable,
DataSource::doDictionaryFromContext);
dataSource.loadDataBlock();
svd::Distributed<step1Local> algorithm;
algorithm.input.set( svd::data, dataSource.getNumericTable() );
algorithm.compute();
dataFromStep1ForStep2[block] = algorithm.getPartialResult()->get( svd::outputOfStep1ForStep2 );
dataFromStep1ForStep3[block] = algorithm.getPartialResult()->get( svd::outputOfStep1ForStep3 );
}
void computeOnMasterNode()
{
svd::Distributed<step2Master> algorithm;
for (size_t i = 0; i < nBlocks; i++)
{
algorithm.input.add( svd::inputOfStep2FromStep1, i, dataFromStep1ForStep2[i] );
}
algorithm.compute();
services::SharedPtr<svd::DistributedPartialResult> pres = algorithm.getPartialResult();
services::SharedPtr<KeyValueDataCollection> inputForStep3FromStep2 = pres->get( svd::outputOfStep2ForStep3 );
for (size_t i = 0; i < nBlocks; i++)
{
dataFromStep2ForStep3[i] = services::staticPointerCast<data_management::DataCollection, SerializationIface>((*inputForStep3FromStep2)[i]);
}
services::SharedPtr<svd::Result> res = algorithm.getResult();
Sigma = res->get(svd::singularValues );
V = res->get(svd::rightSingularMatrix);
}
void finalizeComputestep1Local(size_t block)
{
svd::Distributed<step3Local> algorithm;
algorithm.input.set( svd::inputOfStep3FromStep1, dataFromStep1ForStep3[block] );
algorithm.input.set( svd::inputOfStep3FromStep2, dataFromStep2ForStep3[block] );
algorithm.compute();
algorithm.finalizeCompute();
services::SharedPtr<svd::Result> res = algorithm.getResult();
Ui[block] = res->get(svd::leftSingularMatrix);
}