Spark
- オープンソースの分散処理フレームワーク
- 分散処理で有名な Hadoopは、独自ファイルシステム(hdfs)を介して処理
- Spark は、RDD(Resilient Distributed Dataset)という障害耐久性分散可能なデータセットを、オンメモリで実行できるため、高速な処理が可能
Sparkの構成
- Spark Core Spark
- RDDを提供
- Spark Streaming
- データストリーム処理を提供
- Twitterなどのリアルタイムデータを取得する場合などに使える
- Spark SQL
- 構造化データにアクセスする機能を提供
- JSONなども扱える
- Mlib
- 汎用的な機械学習ライブラリを提供
- word2vecを使った類似後分類とかできる
- Graph X
- ソーシャルグラフを扱う場面で役にたつ
サンプルのアプリケーション作成
https://blog.excite.co.jp/exdev/27638387/
環境構築
- Spark アプリケーションは、Java、Scala、Python、R の4言語で記述することが可能
- Spark の実装に Scala が用いられている
インストール
$ brew intall apache-spark
$ brew intall scala
$ brew intall sbt
環境変数に Sparkを通す (x.y.z は、バージョン)
vi ~/.bashrc
# ADD
export Spark_HOME=Spark_HOME-/usr/local/Celler/apache-spark/x.y.z
ビルド環境の構築
WordCount/
├── build.sbt(ビルド方法を定義するsbtファイル)
├── input.txt(ワードカウントする対象の入力ファイル)
├── output/ (ワードカウントの結果)
│
├── project/ (sbtの追加設定を入れるファイル)
│ └── assembly.sbt (sbtのプラグイン)
└── src/
└──main/
└── scala/
└── jp/
└── excite/
└── news/
└── WordCountApp.scala
build.sbt
name := "WordCountApp" version := "1.0.0" scalaVersion := "2.11.8" resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7" assemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first case "application.conf" => MergeStrategy.concat case "unwanted.txt" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } mainClass in assembly := Some("WordCountApp")
project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
- 生成するjarファイルに依存するパッケージも全部入る
- 並列処理を行うためのクラスタ環境へのデプロイ(spark-submit)が1つのファイル配布のみで済む
- コンフリクトを解消するMergeStrategyを使っている
サンプル実装
package jp.excite.news
import java.util.regex.{Matcher, Pattern}
import scala.collection.convert.WrapAsScala._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.atilika.kuromoji.Tokenizer
import org.atilika.kuromoji.Token
object WordCountApp{
def main(args: Array[String]) {
//スパークの環境設定
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")
val sc = new SparkContext(sparkConf)
//kuromojiのトークナイザ
val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()
//テキストファイルから1行ずつ読み込み。名詞を配列に分解する。
//テキストファイルからRDDオブジェクトを取得する。
val input = sc.textFile("input.txt").flatMap(line => {
val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)
val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
tokens.foreach(token => {
if(token.getAllFeatures().indexOf("名詞") != -1) {
output += token.getSurfaceForm()
}})
output// return
})
//ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。
val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
//降順に単語を列挙して出力する。
val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("ouput")
}
}
処理フロー
- input.txt1行ずつ入力 (ex 今日は晴れ。明日も晴れ。)
- 読み込んだ文字列を形態素解析で分解し名詞のみ配列に。(ex [今日, 晴れ, 明日, 晴れ])
- 数えるために名詞をキーとした(名詞, 1)タプル(組み)に変換 (ex [(今日,1), (晴れ,1), (明日,1), (晴れ,1)])
- 同じ名詞(キー)に対して加算をする。(ex [(今日,1), (晴れ,2), (明日,1)])
- キーでソートするためにキーと値を逆にする。(ex [(1,今日), (2,晴れ), (1,明日)])
- キーでソートする(デフォルトは昇順) (ex [(2,晴れ), (1,今日) ,(1,明日)])
- 上位10個のデータを取得し、それぞれを出力。
- これらの処理クロージャーを使ってふるまいををRDD変換関数
- map (別の型への変換)
- flatMap (別の型への変換。要素数は複数可)
- reduceByKey (キー毎に要素同士を演算する)
- etc…
- に記述することで分散処理を比較的簡単に書くことができます。
実行
- input.txt は、Wikipedia から引用 (https://ja.wikipedia.org/wiki/%E6%A9%9F%E6%A2%B0%E5%AD%A6%E7%BF%92)
$ sbt run
$ sbt assembly
Sparkのクラスタ環境へのデプロイ(spark-submit)用のjarファイルが作られる
output以下にpart-00000x(xは数字)のファイルが作られている