java - Can we put some computation task inside setup method of mapper class in mapreduce code -


i have used setup() method inside mapper class. there user defined method apriorigenk() defined in mapper class , invoked in map() method.

now problem is: whatever know map method called each line of input. suppose there 100 lines method called 100 times. map method called apriorigenk method each time accordingly. there no need call apriorigenk inside map method each time when map method called. i.e. result of apriorigenk method common of line of input map method. apriorigenk method cpu intensive increases computation time when called again , again. can manage somehow call apriorigenk single time , use in map method each time. have tried keep apriorigen inside setup method can called 1 time surprisingly slow execution great extent.

here code:

import datastructuresv2.itemsettrie;  public class aprioritriemapper extends mapper<object, text, text, intwritable> {     public static enum state     {         updated     }      private final static intwritable 1 = new intwritable(1);     private text itemset = new text();      private configuration conf;     private stringtokenizer fitemset;   // store 1 line of previous output file of frequent itemsets     private itemsettrie trielk_1 = null;    // prefix tree store candidate (k-1)-itemsets of previous pass     private int k;                      // itemsetsize or iteration no. //  private itemsettrie trieck = null;          // prefix tree store candidate k-itemsets      public void setup(context context) throws ioexception, interruptedexception     {         conf = context.getconfiguration();         uri[] previousoutputuris = job.getinstance(conf).getcachefiles();         k = conf.getint("k", k);         trielk_1 = new itemsettrie();          (uri previousoutputuri : previousoutputuris)         {             path previousoutputpath = new path(previousoutputuri.getpath());             string previousoutputfilename = previousoutputpath.getname().tostring();             filteritemset(previousoutputfilename, trielk_1);         }     //  trieck = apriorigenk(trielk_1, k-1);    // candidate generation prefix tree of size k-1     }// end method setup      //trim count each line , store itemset     private void filteritemset(string filename, itemsettrie trielk_1)     {         try          {           bufferedreader fis = new bufferedreader(new filereader(filename));           string line = null;         //  trielk_1 = new itemsettrie();            while ((line = fis.readline()) != null)           {               fitemset = new stringtokenizer(line, "\t");               trielk_1.insertcandidateitemset(fitemset.nexttoken());           }           fis.close();         }         catch (ioexception ioe)         {           system.err.println("caught exception while parsing cached file '" + filename + "' : " + stringutils.stringifyexception(ioe));         }     }// end method filteritemset      public void map(object key, text value, context context) throws ioexception, interruptedexception      {         stringtokenizer items = new stringtokenizer(value.tostring().tolowercase()," \t\n\r\f,.:;?![]'"); // tokenize transaction         linkedlist <string>itemlist = new linkedlist<string>(); // store tokens or itemse of transaction          linkedlist <string>listct;      // list of subset of transaction candidates     //  map <string, integer>mapct;     // list of subset of transaction candidates support count         itemsettrie trieck = null;          // prefix tree store candidate k-itemsets         stringtokenizer candidate;          trieck = apriorigenk(trielk_1, k-1);        // candidate generation prefix tree of size k-1          if(trieck.numberofcandidate() > 0)             context.getcounter(state.updated).increment(1);     // increment counter          // optimization: if transaction size less candidate size should not checked         if(items.counttokens() >= k)         {             while (items.hasmoretokens())               // add tokens of transaction list                 itemlist.add(items.nexttoken());              // use either simple linkedlist listct or map mapct             listct = trieck.candidatesupportcount1(itemlist, k);             for(string listctmember : listct)   // generate (key, value) pair. work on listct             {                 candidate = new stringtokenizer(listctmember, "\n");                 if(candidate.hasmoretokens())                 {                     itemset.set(candidate.nexttoken()); context.write(itemset, one);                 }             }         } // end if     } // end method map      // generating candidate prefix tree of size k using prefix tree of size k-1     public itemsettrie apriorigenk(itemsettrie trielk_1, int itemsetsize)   // itemsetsize of trie lk_1     {         itemsettrie candidatetree = new itemsettrie();      // local prefix tree store candidates k-itemsets         trielk_1.candidategenk(candidatetree, itemsetsize); // new candidate prefix tree obtained         return candidatetree;                               // return prefix tree of size k     } // end method apriorigenk } //end class triebasedspcitemsetmapper 

here driver class:

public class aprioritrie { private static logger log = logger.getlogger(aprioritrie.class);

public static void main(string[] args) throws exception {     configuration conf = new configuration();  //  string minsup = "1";     string minsup = null;     list<string> otherargs = new arraylist<string>();     (int i=0; < args.length; ++i)     {         if ("-minsup".equals(args[i]))             minsup = args[++i];         else             otherargs.add(args[i]);     }      conf.set("min_sup", minsup);      log.info("started counting 1-itemset ....................");     date date; long starttime, endtime;                         // recording start , end time of job     date = new date(); starttime = date.gettime();              // starting timer      // phase-1     job job = job.getinstance(conf, "aprioritrie: iteration-1");     job.setjarbyclass(aprioribasedalgorithms.aprioritrie.class);      job.setmapperclass(oneitemsetmapper.class);     job.setcombinerclass(oneitemsetcombiner.class);     job.setreducerclass(oneitemsetreducer.class);  //  job.setoutputkeyclass(text.class);     job.setoutputkeyclass(intwritable.class);     job.setoutputvalueclass(intwritable.class);      job.setinputformatclass(nlineinputformat.class);     nlineinputformat.setnumlinespersplit(job, 10000);   // set specific no. of line of records  //  path inputpath = new path("hdfs://hadoopmaster:9000/user/hduser/sample-transactions1/");     path inputpath = new path(otherargs.get(0)); //  path outputpath = new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-1");     path outputpath = new path(otherargs.get(1)+"/fis-1");      fileinputformat.setinputpaths(job, inputpath);     fileoutputformat.setoutputpath(job, outputpath);                  if(job.waitforcompletion(true))         log.info("successfully- completed frequent 1-itemsets geneation.");     else         log.info("error- completed frequent 1-itemsets geneation.");      // phase-k >=2     int iteration = 1; long counter;         {         configuration conf2 = new configuration();         conf2.set("min_sup", minsup);         conf2.setint("k", iteration+1);          log.info("started counting "+(iteration+1)+"-itemsets ..................");         job job2 = job.getinstance(conf2, "aprioritrie: iteration-"+(iteration+1));         job2.setjarbyclass(aprioribasedalgorithms.aprioritrie.class);          job2.setmapperclass(aprioritriemapper.class);         job2.setcombinerclass(itemsetcombiner.class);         job2.setreducerclass(itemsetreducer.class);          job2.setoutputkeyclass(text.class);         job2.setoutputvalueclass(intwritable.class);          job2.setnumreducetasks(4); // break output in 3 files          job2.setinputformatclass(nlineinputformat.class);         nlineinputformat.setnumlinespersplit(job2, 10000);          filesystem fs = filesystem.get(new uri("hdfs://hadoopmaster:9000"), conf2);     //  filestatus[] status = fs.liststatus(new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-"+iteration+"/"));         filestatus[] status = fs.liststatus(new path(otherargs.get(1)+"/fis-"+iteration));         (int i=0;i<status.length;i++)         {             job2.addcachefile(status[i].getpath().touri()); // add files inside output fis             //job2.addfiletoclasspath(status[i].getpath());         }      //  input same these job     //  outputpath = new path("hdfs://hadoopmaster:9000/user/hduser/aprioritrie/fis-"+(iteration+1));         outputpath = new path(otherargs.get(1)+"/fis-"+(iteration+1));          fileinputformat.setinputpaths(job2, inputpath);         fileoutputformat.setoutputpath(job2, outputpath);          if(job2.waitforcompletion(true))             log.info("successfully- completed frequent "+(iteration+1)+"-itemsets generation.");         else             log.info("error- completed frequent "+(iteration+1)+"-itemsets generation.");          iteration++;         counter = job2.getcounters().findcounter(aprioritriemapper.state.updated).getvalue();     } while (counter > 0);      date = new date(); endtime = date.gettime();                    //end timer     log.info("total time (in milliseconds) = "+ (endtime-starttime));     log.info("total time (in seconds) = "+ (endtime-starttime)*0.001f); } 

}

you can add function call run method of mapper after setup call.this ensure method called once per mapper.

public class mymapper extends mapper<longwritable,text,text,intwritable>  {     public void map(longwritable key,text value,context context) throws ioexception,interruptedexception     {                //do      }     public void myfunc(string parm)     {         system.out.println("parm="+parm);     }     public void run(context context) throws ioexception, interruptedexception      {         setup(context);         myfunc("hello");         while(context.nextkeyvalue())         {             map(context.getcurrentkey(), context.getcurrentvalue(), context);         }      }  } 

Comments

Popular posts from this blog

php - Invalid Cofiguration - yii\base\InvalidConfigException - Yii2 -

How to show in django cms breadcrumbs full path? -

ruby on rails - npm error: tunneling socket could not be established, cause=connect ETIMEDOUT -