i'm getting error:
java.lang.exception: java.io.ioexception: type mismatch in key map: expected org.apache.hadoop.io.text, received org.apache.hadoop.io.longwritable @ org.apache.hadoop.mapred.localjobrunner$job.runtasks(localjobrunner.java:462) @ org.apache.hadoop.mapred.localjobrunner$job.run(localjobrunner.java:522) caused by: java.io.ioexception: type mismatch in key map: expected org.apache.hadoop.io.text, received org.apache.hadoop.io.longwritable @ org.apache.hadoop.mapred.maptask$mapoutputbuffer.collect(maptask.java:1074) @ org.apache.hadoop.mapred.maptask$newoutputcollector.write(maptask.java:715) @ org.apache.hadoop.mapreduce.task.taskinputoutputcontextimpl.write(taskinputoutputcontextimpl.java:89) @ org.apache.hadoop.mapreduce.lib.map.wrappedmapper$context.write(wrappedmapper.java:112) @ org.apache.hadoop.mapreduce.mapper.map(mapper.java:125) @ org.apache.hadoop.mapreduce.mapper.run(mapper.java:146) @ org.apache.hadoop.mapred.maptask.runnewmapper(maptask.java:787) @ org.apache.hadoop.mapred.maptask.run(maptask.java:341) @ org.apache.hadoop.mapred.localjobrunner$job$maptaskrunnable.run(localjobrunner.java:243) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.run(futuretask.java:266) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) from code:
public class myavroapplication { private static class myreducer extends reducer<email, text, text, text> { private text = new text(); private text subject = new text(); public void reduce(email key, iterable<text> values, context context) throws ioexception, interruptedexception { from.set(key.getfrom().tostring()); subject.set(key.getsubject().tostring()); context.write(from, subject); } } private static class myavromapper extends mapper<text, email, email, text> { protected void map(email email, nullwritable value, context context) throws ioexception, interruptedexception { context.write(email, new text(email.getsubject().tostring())); } } public static void main(string[] args) throws exception { usergroupinformation ugi = usergroupinformation.createremoteuser("hdfs"); ugi.doas(new privilegedexceptionaction<void>() { public void run() throws exception { configuration conf = new configuration(); conf.set("fs.defaultfs", "hdfs://127.0.0.1:8020/user/rich"); conf.set("hadoop.job.ugi", "hdfs"); job job = job.getinstance(conf, "mytest"); avrojob.setinputkeyschema(job, email.getclassschema()); avrojob.setoutputkeyschema(job, email.getclassschema()); job.setjarbyclass(myapplication.class); job.setmapperclass(myavromapper.class); job.setcombinerclass(myreducer.class); job.setreducerclass(myreducer.class); // job.setinputformatclass(avrokeyinputformat.class); // job.setoutputformatclass(avrokeyoutputformat.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); fileinputformat.addinputpath(job, new path("/user/rich/in/")); fileoutputformat.setoutputpath(job, new path("/user/rich/out/")); boolean result = job.waitforcompletion(true); system.out.println(result); return null; } }); } } i'm struggling understand how match various key-value parameters on way in , out everywhere. intention read avro format files in email , run simple calculations, counting number of emails start.
edit:
a similar error occurs mapper:
private static class myavromapper extends mapper<text, avrokey<email>, avrokey<email>, text> { protected void map(text key, avrokey<email> email, context context) throws ioexception, interruptedexception { context.write(email, new text(email.datum().getsubject().tostring())); } } and error:
java.lang.exception: java.lang.classcastexception: org.apache.hadoop.io.longwritable cannot cast org.apache.hadoop.io.text @ org.apache.hadoop.mapred.localjobrunner$job.runtasks(localjobrunner.java:462) @ org.apache.hadoop.mapred.localjobrunner$job.run(localjobrunner.java:522) caused by: java.lang.classcastexception: org.apache.hadoop.io.longwritable cannot cast org.apache.hadoop.io.text @ com.test.myapp.myavroapplication$myavromapper.map(myavroapplication.java:1) @ org.apache.hadoop.mapreduce.mapper.run(mapper.java:146) @ org.apache.hadoop.mapred.maptask.runnewmapper(maptask.java:787) @ org.apache.hadoop.mapred.maptask.run(maptask.java:341) @ org.apache.hadoop.mapred.localjobrunner$job$maptaskrunnable.run(localjobrunner.java:243) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.run(futuretask.java:266) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) the email class generated schema:
@suppresswarnings("all") @org.apache.avro.specific.avrogenerated public class email extends org.apache.avro.specific.specificrecordbase implements org.apache.avro.specific.specificrecord { public static final org.apache.avro.schema schema$ = new org.apache.avro.schema.parser().parse("{\"type\":\"record\",\"name\":\"email\",\"namespace\":\"com.test.myapp.avro\",\"fields\":[{\"name\":\"message_id\",\"type\":[\"null\",\"string\"],\"doc\":\"\"},{\"name\":\"date\",\"type\":[\"long\",\"null\"]},{\"name\":\"from\",\"type\":{\"type\":\"record\",\"name\":\"emailaddress\",\"fields\":[{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"\"},{\"name\":\"address\",\"type\":[\"null\",\"string\"],\"doc\":\"\"}]}},{\"name\":\"subject\",\"type\":[\"string\",\"null\"]},{\"name\":\"body\",\"type\":[\"string\",\"null\"]},{\"name\":\"tos\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"emailaddress\"]}],\"doc\":\"\"},{\"name\":\"ccs\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"emailaddress\"]}],\"doc\":\"\"},{\"name\":\"bccs\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\",\"emailaddress\"]}],\"doc\":\"\"}]}"); public static org.apache.avro.schema getclassschema() { return schema$; } /** */ @deprecated public java.lang.charsequence message_id; @deprecated public java.lang.long date; @deprecated public com.test.myapp.avro.emailaddress from; @deprecated public java.lang.charsequence subject; @deprecated public java.lang.charsequence body; /** */ @deprecated public java.util.list<com.test.myapp.avro.emailaddress> tos; /** */ @deprecated public java.util.list<com.test.myapp.avro.emailaddress> ccs; /** */ @deprecated public java.util.list<com.test.myapp.avro.emailaddress> bccs; /** * default constructor. note not initialize fields * default values schema. if desired * 1 should use <code>newbuilder()</code>. */ public email() {} /** * all-args constructor. */ public email(java.lang.charsequence message_id, java.lang.long date, com.test.myapp.avro.emailaddress from, java.lang.charsequence subject, java.lang.charsequence body, java.util.list<com.test.myapp.avro.emailaddress> tos, java.util.list<com.test.myapp.avro.emailaddress> ccs, java.util.list<com.test.myapp.avro.emailaddress> bccs) { this.message_id = message_id; this.date = date; this.from = from; this.subject = subject; this.body = body; this.tos = tos; this.ccs = ccs; this.bccs = bccs; } public org.apache.avro.schema getschema() { return schema$; } // used datumwriter. applications should not call. public java.lang.object get(int field$) { switch (field$) { case 0: return message_id; case 1: return date; case 2: return from; case 3: return subject; case 4: return body; case 5: return tos; case 6: return ccs; case 7: return bccs; default: throw new org.apache.avro.avroruntimeexception("bad index"); } } // used datumreader. applications should not call. @suppresswarnings(value="unchecked") public void put(int field$, java.lang.object value$) { switch (field$) { case 0: message_id = (java.lang.charsequence)value$; break; case 1: date = (java.lang.long)value$; break; case 2: = (com.test.myapp.avro.emailaddress)value$; break; case 3: subject = (java.lang.charsequence)value$; break; case 4: body = (java.lang.charsequence)value$; break; case 5: tos = (java.util.list<com.test.myapp.avro.emailaddress>)value$; break; case 6: ccs = (java.util.list<com.test.myapp.avro.emailaddress>)value$; break; case 7: bccs = (java.util.list<com.test.myapp.avro.emailaddress>)value$; break; default: throw new org.apache.avro.avroruntimeexception("bad index"); } } /** * gets value of 'message_id' field. * */ public java.lang.charsequence getmessageid() { return message_id; } /** * sets value of 'message_id' field. * * @param value value set. */ public void setmessageid(java.lang.charsequence value) { this.message_id = value; } /** * gets value of 'date' field. */ public java.lang.long getdate() { return date; } /** * sets value of 'date' field. * @param value value set. */ public void setdate(java.lang.long value) { this.date = value; } /** * gets value of 'from' field. */ public com.test.myapp.avro.emailaddress getfrom() { return from; } /** * sets value of 'from' field. * @param value value set. */ public void setfrom(com.test.myapp.avro.emailaddress value) { this.from = value; } /** * gets value of 'subject' field. */ public java.lang.charsequence getsubject() { return subject; } /** * sets value of 'subject' field. * @param value value set. */ public void setsubject(java.lang.charsequence value) { this.subject = value; } /** * gets value of 'body' field. */ public java.lang.charsequence getbody() { return body; } /** * sets value of 'body' field. * @param value value set. */ public void setbody(java.lang.charsequence value) { this.body = value; } /** * gets value of 'tos' field. * */ public java.util.list<com.test.myapp.avro.emailaddress> gettos() { return tos; } /** * sets value of 'tos' field. * * @param value value set. */ public void settos(java.util.list<com.test.myapp.avro.emailaddress> value) { this.tos = value; } /** * gets value of 'ccs' field. * */ public java.util.list<com.test.myapp.avro.emailaddress> getccs() { return ccs; } /** * sets value of 'ccs' field. * * @param value value set. */ public void setccs(java.util.list<com.test.myapp.avro.emailaddress> value) { this.ccs = value; } /** * gets value of 'bccs' field. * */ public java.util.list<com.test.myapp.avro.emailaddress> getbccs() { return bccs; } /** * sets value of 'bccs' field. * * @param value value set. */ public void setbccs(java.util.list<com.test.myapp.avro.emailaddress> value) { this.bccs = value; } /** creates new email recordbuilder */ public static com.test.myapp.avro.email.builder newbuilder() { return new com.test.myapp.avro.email.builder(); } /** creates new email recordbuilder copying existing builder */ public static com.test.myapp.avro.email.builder newbuilder(com.test.myapp.avro.email.builder other) { return new com.test.myapp.avro.email.builder(other); } /** creates new email recordbuilder copying existing email instance */ public static com.test.myapp.avro.email.builder newbuilder(com.test.myapp.avro.email other) { return new com.test.myapp.avro.email.builder(other); } /** * recordbuilder email instances. */ public static class builder extends org.apache.avro.specific.specificrecordbuilderbase<email> implements org.apache.avro.data.recordbuilder<email> { private java.lang.charsequence message_id; private java.lang.long date; private com.test.myapp.avro.emailaddress from; private java.lang.charsequence subject; private java.lang.charsequence body; private java.util.list<com.test.myapp.avro.emailaddress> tos; private java.util.list<com.test.myapp.avro.emailaddress> ccs; private java.util.list<com.test.myapp.avro.emailaddress> bccs; /** creates new builder */ private builder() { super(com.test.myapp.avro.email.schema$); } /** creates builder copying existing builder */ private builder(com.test.myapp.avro.email.builder other) { super(other); if (isvalidvalue(fields()[0], other.message_id)) { this.message_id = data().deepcopy(fields()[0].schema(), other.message_id); fieldsetflags()[0] = true; } if (isvalidvalue(fields()[1], other.date)) { this.date = data().deepcopy(fields()[1].schema(), other.date); fieldsetflags()[1] = true; } if (isvalidvalue(fields()[2], other.from)) { this.from = data().deepcopy(fields()[2].schema(), other.from); fieldsetflags()[2] = true; } if (isvalidvalue(fields()[3], other.subject)) { this.subject = data().deepcopy(fields()[3].schema(), other.subject); fieldsetflags()[3] = true; } if (isvalidvalue(fields()[4], other.body)) { this.body = data().deepcopy(fields()[4].schema(), other.body); fieldsetflags()[4] = true; } if (isvalidvalue(fields()[5], other.tos)) { this.tos = data().deepcopy(fields()[5].schema(), other.tos); fieldsetflags()[5] = true; } if (isvalidvalue(fields()[6], other.ccs)) { this.ccs = data().deepcopy(fields()[6].schema(), other.ccs); fieldsetflags()[6] = true; } if (isvalidvalue(fields()[7], other.bccs)) { this.bccs = data().deepcopy(fields()[7].schema(), other.bccs); fieldsetflags()[7] = true; } } /** creates builder copying existing email instance */ private builder(com.test.myapp.avro.email other) { super(com.test.myapp.avro.email.schema$); if (isvalidvalue(fields()[0], other.message_id)) { this.message_id = data().deepcopy(fields()[0].schema(), other.message_id); fieldsetflags()[0] = true; } if (isvalidvalue(fields()[1], other.date)) { this.date = data().deepcopy(fields()[1].schema(), other.date); fieldsetflags()[1] = true; } if (isvalidvalue(fields()[2], other.from)) { this.from = data().deepcopy(fields()[2].schema(), other.from); fieldsetflags()[2] = true; } if (isvalidvalue(fields()[3], other.subject)) { this.subject = data().deepcopy(fields()[3].schema(), other.subject); fieldsetflags()[3] = true; } if (isvalidvalue(fields()[4], other.body)) { this.body = data().deepcopy(fields()[4].schema(), other.body); fieldsetflags()[4] = true; } if (isvalidvalue(fields()[5], other.tos)) { this.tos = data().deepcopy(fields()[5].schema(), other.tos); fieldsetflags()[5] = true; } if (isvalidvalue(fields()[6], other.ccs)) { this.ccs = data().deepcopy(fields()[6].schema(), other.ccs); fieldsetflags()[6] = true; } if (isvalidvalue(fields()[7], other.bccs)) { this.bccs = data().deepcopy(fields()[7].schema(), other.bccs); fieldsetflags()[7] = true; } } /** gets value of 'message_id' field */ public java.lang.charsequence getmessageid() { return message_id; } /** sets value of 'message_id' field */ public com.test.myapp.avro.email.builder setmessageid(java.lang.charsequence value) { validate(fields()[0], value); this.message_id = value; fieldsetflags()[0] = true; return this; } /** checks whether 'message_id' field has been set */ public boolean hasmessageid() { return fieldsetflags()[0]; } /** clears value of 'message_id' field */ public com.test.myapp.avro.email.builder clearmessageid() { message_id = null; fieldsetflags()[0] = false; return this; } /** gets value of 'date' field */ public java.lang.long getdate() { return date; } /** sets value of 'date' field */ public com.test.myapp.avro.email.builder setdate(java.lang.long value) { validate(fields()[1], value); this.date = value; fieldsetflags()[1] = true; return this; } /** checks whether 'date' field has been set */ public boolean hasdate() { return fieldsetflags()[1]; } /** clears value of 'date' field */ public com.test.myapp.avro.email.builder cleardate() { date = null; fieldsetflags()[1] = false; return this; } /** gets value of 'from' field */ public com.test.myapp.avro.emailaddress getfrom() { return from; } /** sets value of 'from' field */ public com.test.myapp.avro.email.builder setfrom(com.test.myapp.avro.emailaddress value) { validate(fields()[2], value); this.from = value; fieldsetflags()[2] = true; return this; } /** checks whether 'from' field has been set */ public boolean hasfrom() { return fieldsetflags()[2]; } /** clears value of 'from' field */ public com.test.myapp.avro.email.builder clearfrom() { = null; fieldsetflags()[2] = false; return this; } /** gets value of 'subject' field */ public java.lang.charsequence getsubject() { return subject; } /** sets value of 'subject' field */ public com.test.myapp.avro.email.builder setsubject(java.lang.charsequence value) { validate(fields()[3], value); this.subject = value; fieldsetflags()[3] = true; return this; } /** checks whether 'subject' field has been set */ public boolean hassubject() { return fieldsetflags()[3]; } /** clears value of 'subject' field */ public com.test.myapp.avro.email.builder clearsubject() { subject = null; fieldsetflags()[3] = false; return this; } /** gets value of 'body' field */ public java.lang.charsequence getbody() { return body; } /** sets value of 'body' field */ public com.test.myapp.avro.email.builder setbody(java.lang.charsequence value) { validate(fields()[4], value); this.body = value; fieldsetflags()[4] = true; return this; } /** checks whether 'body' field has been set */ public boolean hasbody() { return fieldsetflags()[4]; } /** clears value of 'body' field */ public com.test.myapp.avro.email.builder clearbody() { body = null; fieldsetflags()[4] = false; return this; } /** gets value of 'tos' field */ public java.util.list<com.test.myapp.avro.emailaddress> gettos() { return tos; } /** sets value of 'tos' field */ public com.test.myapp.avro.email.builder settos(java.util.list<com.test.myapp.avro.emailaddress> value) { validate(fields()[5], value); this.tos = value; fieldsetflags()[5] = true; return this; } /** checks whether 'tos' field has been set */ public boolean hastos() { return fieldsetflags()[5]; } /** clears value of 'tos' field */ public com.test.myapp.avro.email.builder cleartos() { tos = null; fieldsetflags()[5] = false; return this; } /** gets value of 'ccs' field */ public java.util.list<com.test.myapp.avro.emailaddress> getccs() { return ccs; } /** sets value of 'ccs' field */ public com.test.myapp.avro.email.builder setccs(java.util.list<com.test.myapp.avro.emailaddress> value) { validate(fields()[6], value); this.ccs = value; fieldsetflags()[6] = true; return this; } /** checks whether 'ccs' field has been set */ public boolean hasccs() { return fieldsetflags()[6]; } /** clears value of 'ccs' field */ public com.test.myapp.avro.email.builder clearccs() { ccs = null; fieldsetflags()[6] = false; return this; } /** gets value of 'bccs' field */ public java.util.list<com.test.myapp.avro.emailaddress> getbccs() { return bccs; } /** sets value of 'bccs' field */ public com.test.myapp.avro.email.builder setbccs(java.util.list<com.test.myapp.avro.emailaddress> value) { validate(fields()[7], value); this.bccs = value; fieldsetflags()[7] = true; return this; } /** checks whether 'bccs' field has been set */ public boolean hasbccs() { return fieldsetflags()[7]; } /** clears value of 'bccs' field */ public com.test.myapp.avro.email.builder clearbccs() { bccs = null; fieldsetflags()[7] = false; return this; } @override public email build() { try { email record = new email(); record.message_id = fieldsetflags()[0] ? this.message_id : (java.lang.charsequence) defaultvalue(fields()[0]); record.date = fieldsetflags()[1] ? this.date : (java.lang.long) defaultvalue(fields()[1]); record.from = fieldsetflags()[2] ? this.from : (com.test.myapp.avro.emailaddress) defaultvalue(fields()[2]); record.subject = fieldsetflags()[3] ? this.subject : (java.lang.charsequence) defaultvalue(fields()[3]); record.body = fieldsetflags()[4] ? this.body : (java.lang.charsequence) defaultvalue(fields()[4]); record.tos = fieldsetflags()[5] ? this.tos : (java.util.list<com.test.myapp.avro.emailaddress>) defaultvalue(fields()[5]); record.ccs = fieldsetflags()[6] ? this.ccs : (java.util.list<com.test.myapp.avro.emailaddress>) defaultvalue(fields()[6]); record.bccs = fieldsetflags()[7] ? this.bccs : (java.util.list<com.test.myapp.avro.emailaddress>) defaultvalue(fields()[7]); return record; } catch (exception e) { throw new org.apache.avro.avroruntimeexception(e); } } } }
i think there's problem in mapper. when declare:
private static class myavromapper extends mapper<text, email, email, text> { you're telling hadoop you're going expect couple (text, email) types of key , value of input , couple (email, text) type key , values of mapper emit (note 4 types declared generics class.
but when write signature this:
protected void map(email email, nullwritable value, context context) throws ioexception, interruptedexception { you're telling hadoop key , value type input email , nullwritable, hence exception.
Comments
Post a Comment