mapreduce - Best way to split log files -


need , seems such common task do: have hourly huge logfiles containing many different events. have been using hive split these events different files, in hard coded way:

from events   insert overwrite table specificevent1    events.event_type='specificevent1'   insert overwrite table specificevent2    events.event_type='specificevent2' ...; 

this problematic code must change each new event add.

we try use dynamic partitioning automatic parsing experiencing problems:

  1. if partition schema /year/month/day/hour/event cannot recover partitions of more day number monthly ~ (30 days)(24 hours)(100~ events)=~72k way many work with.
  2. if schema event/year/month/day/hour since event dynamic part forces next partitions scripted dynamic, , causes splitting take more time number of partitions grow.

is there better way (hive , non-hive solutions)?

hope others...

i found hive not way go if want split logfile many different files (file per event_type). dynamic partitions offered hive have many limitations imho.

what ended doing writing custom map-reduce jar. found old hadoop interface more suitable offers multipletextoutputformat abstract class lets implement generatefilenameforkeyvalue(). (new hadoop offers different multiple output file mechanism: multipleoutputs great if have predefined output locations, did not how have them on fly key-value)

example code:

\* run example: hadoop jar dynamicsplit.jar dynamiceventsplit.dynamiceventsplitmultifilemapreduce /event/us/incoming/2013-01-01-01/ event 2013-01-01-01 2 "[a-za-z0-9_ ]+" "/event/dynamicsplit1/" "," */ package dynamiceventsplit;  import java.io.*; import java.util.*; import org.apache.hadoop.fs.path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapred.lib.*; import java.io.ioexception; import java.util.iterator;  import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.recordwriter; import org.apache.hadoop.mapred.reporter; import org.apache.hadoop.util.progressable;  public class dynamiceventsplitmultifilemapreduce {         static class map extends mapreducebase implements mapper<longwritable, text, text, text>           {             private string event_name;             private string eventnameregexp;             private int eventnamecolumnnumber;             private string columndelimeter=",";              public void configure(jobconf job)             {                 eventnameregexp=job.get("eventnameregexp");                 eventnamecolumnnumber=integer.parseint(job.get("eventnamecolumnnumber"));                 columndelimeter=job.get("columndelimeter");             }             public void map(longwritable key, text value,outputcollector<text, text> output, reporter reporter) throws ioexception              {                 //check expected event_name field exists                   string [] dall=value.tostring().split(columndelimeter);                 if (dall.length<eventnamecolumnnumber)                 {                     return;                 }                 event_name=dall[eventnamecolumnnumber-1];                 //check expected event_name valid                   if (!event_name.matches(eventnameregexp))                 {                     return;                 }                 output.collect(new text(dall[1]),value);             }         }          static class reduce extends mapreducebase implements reducer<text, text, text, text>          {             public void reduce(text key, iterator<text> values,outputcollector<text, text> output, reporter reporter) throws ioexception              {                     while (values.hasnext())                      {                         output.collect(key, values.next());                     }             }         }           static class multifileoutput extends multipletextoutputformat<text, text>          {             private string event_name;             private string site;             private string event_date;             private string year;             private string month;             private string day;             private string hour;             private string basepath;               public recordwriter<text,text> getrecordwriter(filesystem fs, jobconf job,string name, progressable arg3) throws ioexception             {                 recordwriter<text,text> rw=super.getrecordwriter(fs, job, name, arg3);                 site=job.get("site");                 event_date=job.get("date");                 year=event_date.substring(0,4);                 month=event_date.substring(5,7);                 day=event_date.substring(8,10);                 hour=event_date.substring(11,13);                 basepath=job.get("basepath");                 return rw;             }              protected string generatefilenameforkeyvalue(text key, text value,string leaf)              {                 event_name=key.tostring();                 return basepath+"event="+event_name+"/site="+site+"/year="+year+"/month="+month+"/day="+day+"/hour="+hour+"/"+leaf;             }              protected text generateactualkey(text key, text value)              {                 return null;             }         }          public static void main(string[] args) throws exception          {                 string inputfiles=args[0];                 string outputdir=args[1];                 string sitestr=args[2];                 string datestr=args[3];                 string eventnamecolumnnumber=args[4];                 string eventnameregexp=args[5];                 string basepath=args[6];                 string columndelimeter=args[7];                  configuration mycon=new configuration();                 jobconf conf = new jobconf(mycon,dynamiceventsplitmultifilemapreduce.class);                 conf.set("site",sitestr);                 conf.set("date",datestr);                  conf.setoutputkeyclass(text.class);                 conf.setmapoutputkeyclass(text.class);                 conf.setoutputvalueclass(text.class);                  conf.setmapperclass(map.class);                 conf.setreducerclass(reduce.class);                  conf.setinputformat(textinputformat.class);                 conf.setoutputformat(multifileoutput.class);                  conf.setmapspeculativeexecution(false);                 conf.setreducespeculativeexecution(false);                  fileinputformat.setinputpaths(conf,inputfiles);                 fileoutputformat.setoutputpath(conf,new path("/"+outputdir+sitestr+datestr+"/"));                  conf.set("eventnamecolumnnumber",eventnamecolumnnumber);                 conf.set("eventnameregexp",eventnameregexp);                 conf.set("basepath",basepath);                 conf.set("columndelimeter",columndelimeter);                  jobclient.runjob(conf);         } } 

Comments

Popular posts from this blog

php - mySql Join with 4 tables -

css - Text drops down with smaller window -

c# - DetailsView in ASP.Net - How to add another column on the side/add a control in each row? -