java - Can we put some computation task inside setup method of mapper class in mapreduce code -
i have used setup() method inside mapper class. there user defined method apriorigenk() defined in mapper class , invoked in map() method.
now problem is: whatever know map method called each line of input. suppose there 100 lines method called 100 times. map method called apriorigenk method each time accordingly. there no need call apriorigenk inside map method each time when map method called. i.e. result of apriorigenk method common of line of input map method. apriorigenk method cpu intensive increases computation time when called again , again. can manage somehow call apriorigenk single time , use in map method each time. have tried keep apriorigen inside setup method can called 1 time surprisingly slow execution great extent.
here code:
import datastructuresv2.itemsettrie; public class aprioritriemapper extends mapper<object, text, text, intwritable> { public static enum state { updated } private final static intwritable 1 = new intwritable(1); private text itemset = new text(); private configuration conf; private stringtokenizer fitemset; // store 1 line of previous output file of frequent itemsets private itemsettrie trielk_1 = null; // prefix tree store candidate (k-1)-itemsets of previous pass private int k; // itemsetsize or iteration no. // private itemsettrie trieck = null; // prefix tree store candidate k-itemsets public void setup(context context) throws ioexception, interruptedexception { conf = context.getconfiguration(); uri[] previousoutputuris = job.getinstance(conf).getcachefiles(); k = conf.getint("k", k); trielk_1 = new itemsettrie(); (uri previousoutputuri : previousoutputuris) { path previousoutputpath = new path(previousoutputuri.getpath()); string previousoutputfilename = previousoutputpath.getname().tostring(); filteritemset(previousoutputfilename, trielk_1); } // trieck = apriorigenk(trielk_1, k-1); // candidate generation prefix tree of size k-1 }// end method setup //trim count each line , store itemset private void filteritemset(string filename, itemsettrie trielk_1) { try { bufferedreader fis = new bufferedreader(new filereader(filename)); string line = null; // trielk_1 = new itemsettrie(); while ((line = fis.readline()) != null) { fitemset = new stringtokenizer(line, "\t"); trielk_1.insertcandidateitemset(fitemset.nexttoken()); } fis.close(); } catch (ioexception ioe) { system.err.println("caught exception while parsing cached file '" + filename + "' : " + stringutils.stringifyexception(ioe)); } }// end method filteritemset public void map(object key, text value, context context) throws ioexception, interruptedexception { stringtokenizer items = new stringtokenizer(value.tostring().tolowercase()," \t\n\r\f,.:;?![]'"); // tokenize transaction linkedlist <string>itemlist = new linkedlist<string>(); // store tokens or itemse of transaction linkedlist <string>listct; // list of subset of transaction candidates // map <string, integer>mapct; // list of subset of transaction candidates support count itemsettrie trieck = null; // prefix tree store candidate k-itemsets stringtokenizer candidate; trieck = apriorigenk(trielk_1, k-1); // candidate generation prefix tree of size k-1 if(trieck.numberofcandidate() > 0) context.getcounter(state.updated).increment(1); // increment counter // optimization: if transaction size less candidate size should not checked if(items.counttokens() >= k) { while (items.hasmoretokens()) // add tokens of transaction list itemlist.add(items.nexttoken()); // use either simple linkedlist listct or map mapct listct = trieck.candidatesupportcount1(itemlist, k); for(string listctmember : listct) // generate (key, value) pair. work on listct { candidate = new stringtokenizer(listctmember, "\n"); if(candidate.hasmoretokens()) { itemset.set(candidate.nexttoken()); context.write(itemset, one); } } } // end if } // end method map // generating candidate prefix tree of size k using prefix tree of size k-1 public itemsettrie apriorigenk(itemsettrie trielk_1, int itemsetsize) // itemsetsize of trie lk_1 { itemsettrie candidatetree = new itemsettrie(); // local prefix tree store candidates k-itemsets trielk_1.candidategenk(candidatetree, itemsetsize); // new candidate prefix tree obtained return candidatetree; // return prefix tree of size k } // end method apriorigenk } //end class triebasedspcitemsetmapper
here driver class:
public class aprioritrie { private static logger log = logger.getlogger(aprioritrie.class);
public static void main(string[] args) throws exception { configuration conf = new configuration(); // string minsup = "1"; string minsup = null; list<string> otherargs = new arraylist<string>(); (int i=0; < args.length; ++i) { if ("-minsup".equals(args[i])) minsup = args[++i]; else otherargs.add(args[i]); } conf.set("min_sup", minsup); log.info("started counting 1-itemset ...................."); date date; long starttime, endtime; // recording start , end time of job date = new date(); starttime = date.gettime(); // starting timer // phase-1 job job = job.getinstance(conf, "aprioritrie: iteration-1"); job.setjarbyclass(aprioribasedalgorithms.aprioritrie.class); job.setmapperclass(oneitemsetmapper.class); job.setcombinerclass(oneitemsetcombiner.class); job.setreducerclass(oneitemsetreducer.class); // job.setoutputkeyclass(text.class); job.setoutputkeyclass(intwritable.class); job.setoutputvalueclass(intwritable.class); job.setinputformatclass(nlineinputformat.class); nlineinputformat.setnumlinespersplit(job, 10000); // set specific no. of line of records // path inputpath = new path("hdfs://hadoopmaster:9000/user/hduser/sample-transactions1/"); path inputpath = new path(otherargs.get(0)); // path outputpath = new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-1"); path outputpath = new path(otherargs.get(1)+"/fis-1"); fileinputformat.setinputpaths(job, inputpath); fileoutputformat.setoutputpath(job, outputpath); if(job.waitforcompletion(true)) log.info("successfully- completed frequent 1-itemsets geneation."); else log.info("error- completed frequent 1-itemsets geneation."); // phase-k >=2 int iteration = 1; long counter; { configuration conf2 = new configuration(); conf2.set("min_sup", minsup); conf2.setint("k", iteration+1); log.info("started counting "+(iteration+1)+"-itemsets .................."); job job2 = job.getinstance(conf2, "aprioritrie: iteration-"+(iteration+1)); job2.setjarbyclass(aprioribasedalgorithms.aprioritrie.class); job2.setmapperclass(aprioritriemapper.class); job2.setcombinerclass(itemsetcombiner.class); job2.setreducerclass(itemsetreducer.class); job2.setoutputkeyclass(text.class); job2.setoutputvalueclass(intwritable.class); job2.setnumreducetasks(4); // break output in 3 files job2.setinputformatclass(nlineinputformat.class); nlineinputformat.setnumlinespersplit(job2, 10000); filesystem fs = filesystem.get(new uri("hdfs://hadoopmaster:9000"), conf2); // filestatus[] status = fs.liststatus(new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-"+iteration+"/")); filestatus[] status = fs.liststatus(new path(otherargs.get(1)+"/fis-"+iteration)); (int i=0;i<status.length;i++) { job2.addcachefile(status[i].getpath().touri()); // add files inside output fis //job2.addfiletoclasspath(status[i].getpath()); } // input same these job // outputpath = new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-"+(iteration+1)); outputpath = new path(otherargs.get(1)+"/fis-"+(iteration+1)); fileinputformat.setinputpaths(job2, inputpath); fileoutputformat.setoutputpath(job2, outputpath); if(job2.waitforcompletion(true)) log.info("successfully- completed frequent "+(iteration+1)+"-itemsets generation."); else log.info("error- completed frequent "+(iteration+1)+"-itemsets generation."); iteration++; counter = job2.getcounters().findcounter(aprioritriemapper.state.updated).getvalue(); } while (counter > 0); date = new date(); endtime = date.gettime(); //end timer log.info("total time (in milliseconds) = "+ (endtime-starttime)); log.info("total time (in seconds) = "+ (endtime-starttime)*0.001f); }
}
you can add function call run method of mapper after setup call.this ensure method called once per mapper.
public class mymapper extends mapper<longwritable,text,text,intwritable> { public void map(longwritable key,text value,context context) throws ioexception,interruptedexception { //do } public void myfunc(string parm) { system.out.println("parm="+parm); } public void run(context context) throws ioexception, interruptedexception { setup(context); myfunc("hello"); while(context.nextkeyvalue()) { map(context.getcurrentkey(), context.getcurrentvalue(), context); } } }
Comments
Post a Comment