mapreduce - Best way to split log files -
need , seems such common task do: have hourly huge logfiles containing many different events. have been using hive split these events different files, in hard coded way:
from events insert overwrite table specificevent1 events.event_type='specificevent1' insert overwrite table specificevent2 events.event_type='specificevent2' ...; this problematic code must change each new event add.
we try use dynamic partitioning automatic parsing experiencing problems:
- if partition schema
/year/month/day/hour/eventcannot recover partitions of more day number monthly ~ (30 days)(24 hours)(100~ events)=~72k way many work with. - if schema
event/year/month/day/hoursince event dynamic part forces next partitions scripted dynamic, , causes splitting take more time number of partitions grow.
is there better way (hive , non-hive solutions)?
hope others...
i found hive not way go if want split logfile many different files (file per event_type). dynamic partitions offered hive have many limitations imho.
what ended doing writing custom map-reduce jar. found old hadoop interface more suitable offers multipletextoutputformat abstract class lets implement generatefilenameforkeyvalue(). (new hadoop offers different multiple output file mechanism: multipleoutputs great if have predefined output locations, did not how have them on fly key-value)
example code:
\* run example: hadoop jar dynamicsplit.jar dynamiceventsplit.dynamiceventsplitmultifilemapreduce /event/us/incoming/2013-01-01-01/ event 2013-01-01-01 2 "[a-za-z0-9_ ]+" "/event/dynamicsplit1/" "," */ package dynamiceventsplit; import java.io.*; import java.util.*; import org.apache.hadoop.fs.path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapred.lib.*; import java.io.ioexception; import java.util.iterator; import org.apache.hadoop.fs.filesystem; import org.apache.hadoop.mapred.jobconf; import org.apache.hadoop.mapred.fileoutputformat; import org.apache.hadoop.mapred.recordwriter; import org.apache.hadoop.mapred.reporter; import org.apache.hadoop.util.progressable; public class dynamiceventsplitmultifilemapreduce { static class map extends mapreducebase implements mapper<longwritable, text, text, text> { private string event_name; private string eventnameregexp; private int eventnamecolumnnumber; private string columndelimeter=","; public void configure(jobconf job) { eventnameregexp=job.get("eventnameregexp"); eventnamecolumnnumber=integer.parseint(job.get("eventnamecolumnnumber")); columndelimeter=job.get("columndelimeter"); } public void map(longwritable key, text value,outputcollector<text, text> output, reporter reporter) throws ioexception { //check expected event_name field exists string [] dall=value.tostring().split(columndelimeter); if (dall.length<eventnamecolumnnumber) { return; } event_name=dall[eventnamecolumnnumber-1]; //check expected event_name valid if (!event_name.matches(eventnameregexp)) { return; } output.collect(new text(dall[1]),value); } } static class reduce extends mapreducebase implements reducer<text, text, text, text> { public void reduce(text key, iterator<text> values,outputcollector<text, text> output, reporter reporter) throws ioexception { while (values.hasnext()) { output.collect(key, values.next()); } } } static class multifileoutput extends multipletextoutputformat<text, text> { private string event_name; private string site; private string event_date; private string year; private string month; private string day; private string hour; private string basepath; public recordwriter<text,text> getrecordwriter(filesystem fs, jobconf job,string name, progressable arg3) throws ioexception { recordwriter<text,text> rw=super.getrecordwriter(fs, job, name, arg3); site=job.get("site"); event_date=job.get("date"); year=event_date.substring(0,4); month=event_date.substring(5,7); day=event_date.substring(8,10); hour=event_date.substring(11,13); basepath=job.get("basepath"); return rw; } protected string generatefilenameforkeyvalue(text key, text value,string leaf) { event_name=key.tostring(); return basepath+"event="+event_name+"/site="+site+"/year="+year+"/month="+month+"/day="+day+"/hour="+hour+"/"+leaf; } protected text generateactualkey(text key, text value) { return null; } } public static void main(string[] args) throws exception { string inputfiles=args[0]; string outputdir=args[1]; string sitestr=args[2]; string datestr=args[3]; string eventnamecolumnnumber=args[4]; string eventnameregexp=args[5]; string basepath=args[6]; string columndelimeter=args[7]; configuration mycon=new configuration(); jobconf conf = new jobconf(mycon,dynamiceventsplitmultifilemapreduce.class); conf.set("site",sitestr); conf.set("date",datestr); conf.setoutputkeyclass(text.class); conf.setmapoutputkeyclass(text.class); conf.setoutputvalueclass(text.class); conf.setmapperclass(map.class); conf.setreducerclass(reduce.class); conf.setinputformat(textinputformat.class); conf.setoutputformat(multifileoutput.class); conf.setmapspeculativeexecution(false); conf.setreducespeculativeexecution(false); fileinputformat.setinputpaths(conf,inputfiles); fileoutputformat.setoutputpath(conf,new path("/"+outputdir+sitestr+datestr+"/")); conf.set("eventnamecolumnnumber",eventnamecolumnnumber); conf.set("eventnameregexp",eventnameregexp); conf.set("basepath",basepath); conf.set("columndelimeter",columndelimeter); jobclient.runjob(conf); } }
Comments
Post a Comment