testing - Programatically creating dstreams in apache spark -


i writing self contained integration tests around apache spark streaming. want test code can ingest kinds of edge cases in simulated test data. when doing regular rdds (not streaming). use inline data , call "parallelize" on turn spark rdd. however, can find no such method creating destreams. ideally call "push" function once in while , have tupple magically appear in dstream. atm i'm doing using apache kafka: create temp queue, , write it. seems overkill. i'd rather create test-dstream directly test data without having use kafka mediator.

for testing purpose, can create input stream queue of rdds. pushing more rdds in queue simulate having processed more events in batch interval.

val sc = sparkcontextholder.sc val ssc = new streamingcontext(sc, seconds(1)) val inputdata: mutable.queue[rdd[int]] = mutable.queue() val inputstream: inputdstream[int] = ssc.queuestream(inputdata)  inputdata += sc.makerdd(list(1, 2)) // emulate rdd created during first batch interval inputdata += sc.makerdd(list(3, 4)) // 2nd batch interval // etc  val result = inputstream.map(x => x*x) result.foreachrdd(rdd => assertsomething(rdd)) ssc.start() // don't forget start streaming context 

Comments

Popular posts from this blog

How to show in django cms breadcrumbs full path? -

php - Invalid Cofiguration - yii\base\InvalidConfigException - Yii2 -

ruby on rails - npm error: tunneling socket could not be established, cause=connect ETIMEDOUT -