Apache Spark for Scala 使ってみた
2018-11-01 (Thu) · 370 words

Spark

Sparkの構成

サンプルのアプリケーション作成

https://blog.excite.co.jp/exdev/27638387/

環境構築

インストール

$ brew intall apache-spark
$ brew intall scala
$ brew intall sbt

環境変数に Sparkを通す (x.y.z は、バージョン)

vi ~/.bashrc
# ADD
export Spark_HOME=Spark_HOME-/usr/local/Celler/apache-spark/x.y.z

ビルド環境の構築

WordCount/
├── build.sbt(ビルド方法を定義するsbtファイル)
├── input.txt(ワードカウントする対象の入力ファイル)
├── output/ (ワードカウントの結果)
│
├── project/ (sbtの追加設定を入れるファイル)
│   └── assembly.sbt (sbtのプラグイン)
└── src/
   └──main/
     └── scala/
     └── jp/
       └── excite/
        └── news/
          └── WordCountApp.scala

サンプル実装

package jp.excite.news
 
import java.util.regex.{Matcher, Pattern}
import scala.collection.convert.WrapAsScala._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.atilika.kuromoji.Tokenizer
import org.atilika.kuromoji.Token
 
object WordCountApp{
def main(args: Array[String]) {
    //スパークの環境設定
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")
    val sc = new SparkContext(sparkConf)
    //kuromojiのトークナイザ
    val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()
    //テキストファイルから1行ずつ読み込み。名詞を配列に分解する。
    //テキストファイルからRDDオブジェクトを取得する。
        val input = sc.textFile("input.txt").flatMap(line => {
        val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)
        val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
        tokens.foreach(token => {
            if(token.getAllFeatures().indexOf("名詞") != -1) {
      output += token.getSurfaceForm()
        }})
        output// return
    })
    //ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。
    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
    //降順に単語を列挙して出力する。
    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
    val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("ouput")
    }
}

処理フロー

実行

$ sbt run
$ sbt assembly

Top     back     Posts     Tags     About Me