{"id":5270,"date":"2022-04-16T17:52:27","date_gmt":"2022-04-16T08:52:27","guid":{"rendered":"https:\/\/www.skyer9.pe.kr\/wordpress\/?p=5270"},"modified":"2022-04-19T15:17:17","modified_gmt":"2022-04-19T06:17:17","slug":"spark-%ec%97%90%ec%84%9c-kafka-topic-%ec%9d%bd%ec%96%b4%ec%98%a4%ea%b8%b0","status":"publish","type":"post","link":"https:\/\/www.skyer9.pe.kr\/wordpress\/?p=5270","title":{"rendered":"Spark \uc5d0\uc11c Kafka Topic \uc77d\uc5b4\uc624\uae30"},"content":{"rendered":"<h1>Spark \uc5d0\uc11c Kafka Topic \uc77d\uc5b4\uc624\uae30<\/h1>\n<h2>\ud50c\ub7ec\uadf8\uc778 \uc124\uce58<\/h2>\n<p>IntelliJ \uc5d0\uc11c <code>Scala<\/code>, <code>Big Data Tools<\/code> \uc744 \uc124\uce58\ud55c\ub2e4.<\/p>\n<h2>Scala \ubc84\uc804 \ud655\uc778<\/h2>\n<pre><code class=\"language-bash\">spark-shell\nUsing Spark&#039;s default log4j profile: org\/apache\/spark\/log4j-defaults.properties\nSetting default log level to &quot;WARN&quot;.\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n22\/04\/16 09:45:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\nSpark context Web UI available at http:\/\/ip-172-31-18-128.ap-northeast-2.compute.internal:4040\nSpark context available as &#039;sc&#039; (master = local[*], app id = local-1650102324143).\nSpark session available as &#039;spark&#039;.\nWelcome to\n      ____              __\n     \/ __\/__  ___ _____\/ \/__\n    _\\ \\\/ _ \\\/ _ `\/ __\/  &#039;_\/\n   \/___\/ .__\/\\_,_\/_\/ \/_\/\\_\\   version 3.2.1\n      \/_\/\n\nUsing Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)\nType in expressions to have them evaluated.\nType :help for more information.\n\nscala&gt; :quit<\/code><\/pre>\n<h2>\ud504\ub85c\uc81d\ud2b8 \uc0dd\uc131\ud558\uae30<\/h2>\n<p>Scala \ub97c \uc120\ud0dd\ud558\uace0, JDK \ubc84\uc804\uc740 8 \ub85c \uc120\ud0dd\ud55c\ub2e4.<br \/>\n\ud655\uc778\ud55c Scala \ubc84\uc804\uc744 \uc124\uc815\ud574 \uc900\ub2e4.<\/p>\n<p><img decoding=\"async\" src=\"https:\/\/www.skyer9.pe.kr\/wordpress\/wp-content\/uploads\/2022\/04\/2022-04-16-11.png\" alt=\"\" \/><\/p>\n<h2>\ucd94\uac00\uc124\uc815\ud558\uae30<\/h2>\n<p><a href=\"https:\/\/www.skyer9.pe.kr\/wordpress\/?p=5252\">\uc5ec\uae30<\/a> \ub97c \ucc38\uc870\ud574 \uc124\uc815\ud574 \uc900\ub2e4.<\/p>\n<p>\ud504\ub85c\uc81d\ud2b8\ub97c \uc2e4\ud589\ud558\uba74, jar \uc0dd\uc131 \ud6c4, \uc11c\ubc84\uc5d0 \ubcf5\uc0ac\ud574 \uc8fc\uace0,<br \/>\nSpark-Submit \uae4c\uc9c0 \uc790\ub3d9\uc73c\ub85c \ud574\uc900\ub2e4.<\/p>\n<p>\uc6b0\uc120\uc740 \uc815\uc0c1\uc2e4\ud589\ub9cc \ud655\uc778\ud574 \ubcf8\ub2e4.<\/p>\n<pre><code class=\"language-bash\">\/home\/spark\/spark-3.2.1-bin-hadoop3.2\/bin\/spark-submit --master local --deploy-mode client --name Unnamed file:\/\/$HOME\/ReadTopic.jar<\/code><\/pre>\n<h2>\uc758\uc874\uc131 \ucd94\uac00<\/h2>\n<p>build.sbt \uc5d0 \uc758\uc874\uc131\uc744 \ucd94\uac00\ud574 \uc900\ub2e4.<\/p>\n<pre><code class=\"language-scala\">ThisBuild \/ version := &quot;0.1.0-SNAPSHOT&quot;\n\nThisBuild \/ scalaVersion := &quot;2.12.15&quot;\n\nval sparkVersion = &quot;3.2.1&quot;\n\nlibraryDependencies ++= Seq(\n  &quot;org.apache.spark&quot; %% &quot;spark-core&quot; % sparkVersion,\n  &quot;org.apache.spark&quot; %% &quot;spark-sql&quot; % sparkVersion\n)\n\nlazy val root = (project in file(&quot;.&quot;))\n  .settings(\n    name := &quot;ReadTopic&quot;\n  )<\/code><\/pre>\n<h2>Scala class \uc218\uc815<\/h2>\n<p>Main.scala \ud074\ub798\uc2a4\ub97c \uc218\uc815\ud574 \uc900\ub2e4.<\/p>\n<pre><code class=\"language-scala\">import org.apache.spark.sql.SparkSession\n\nobject ReadTopic {\n  def main(args: Array[String]) {\n\n    val kafkaBrokers = &quot;test.skyer9.pe.kr:9092&quot;\n    val kafkaTopic = &quot;exam&quot;\n\n    val spark = SparkSession.builder()\n      .master(&quot;local[1]&quot;)\n      .appName(&quot;Test&quot;)\n      .getOrCreate()\n\n    val df = spark.readStream\n      .format(&quot;kafka&quot;)\n      .option(&quot;kafka.bootstrap.servers&quot;, kafkaBrokers)\n      .option(&quot;subscribe&quot;, kafkaTopic)\n      .option(&quot;startingOffsets&quot;, &quot;earliest&quot;) \/\/ From starting\n      .load()\n\n    df.printSchema()\n  }\n}<\/code><\/pre>\n<p>\ud328\ud0a4\uc9c0\ub97c \uc778\uc2dd\ud558\uc9c0 \ubabb\ud558\uba74 \ud504\ub85c\uc81d\ud2b8\ub97c \ub2eb\uc558\ub2e4 \ub2e4\uc2dc \uc5f4\uc5b4\uc900\ub2e4.<\/p>\n<h2>\ucd94\uac00\uc124\uc815\ud558\uae30<\/h2>\n<p>\ud328\ud0a4\uc9c0 org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 \uac00 \ucd94\uac00\ub418\uc5b4\uc57c \ud55c\ub2e4.<\/p>\n<pre><code class=\"language-bash\">\/home\/spark\/spark-3.2.1-bin-hadoop3.2\/bin\/spark-submit --master local --deploy-mode client --name Remote --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 file:\/\/$HOME\/ReadTopic.jar<\/code><\/pre>\n<h2>\ud504\ub85c\uc81d\ud2b8 \uc2e4\ud589\ud558\uae30<\/h2>\n<p>key-value \ud615\uc2dd\uc73c\ub85c \ub370\uc774\ud0c0\uac00 \uc218\uc2e0\ub41c\ub2e4.<\/p>\n<pre><code class=\"language-bash\">root\n |-- key: binary (nullable = true)\n |-- value: binary (nullable = true)\n |-- topic: string (nullable = true)\n |-- partition: integer (nullable = true)\n |-- offset: long (nullable = true)\n |-- timestamp: timestamp (nullable = true)\n |-- timestampType: integer (nullable = true)<\/code><\/pre>\n<h2>\ud654\uba74\uc5d0 \ucd9c\ub825\ud558\uae30<\/h2>\n<p>IntelliJ \ucf58\uc194\uc5d0\uc11c \ud55c\uae00\uc774 \uae68\uc9c0\ub294 \uac83\uc740 \uc2e0\uacbd\uc4f8 \ud544\uc694\uc5c6\ub2e4.<br \/>\n(IntelliJ \ubb38\uc81c)<br \/>\n\uc11c\ubc84\uc5d0\uc11c \uc9c1\uc811 \uc2e4\ud589\ud574 \ubcf4\uba74 \uc815\uc0c1\uc801\uc73c\ub85c \ucd9c\ub825\ub41c\ub2e4.<\/p>\n<pre><code class=\"language-scala\">    val kafkaBrokers = &quot;test.skyer9.pe.kr:9092&quot;\n    val kafkaTopic = &quot;json_topic&quot;\n\n    val spark = SparkSession.builder()\n      .master(&quot;local[1]&quot;)\n      .appName(&quot;Test&quot;)\n      .getOrCreate()\n\n    val df = spark.readStream\n      .format(&quot;kafka&quot;)\n      .option(&quot;kafka.bootstrap.servers&quot;, kafkaBrokers)\n      .option(&quot;subscribe&quot;, kafkaTopic)\n      .option(&quot;startingOffsets&quot;, &quot;earliest&quot;) \/\/ From starting\n      .load()\n\n    val jsonStringDF = df.selectExpr(&quot;CAST(value AS STRING)&quot;)\n\n    val schema = new StructType()\n      .add(&quot;dt&quot;,StringType)\n      .add(&quot;msg&quot;,StringType)\n\n    val jsonDF = jsonStringDF.select(from_json(col(&quot;value&quot;), schema).as(&quot;data&quot;))\n      .select(&quot;data.*&quot;)\n\n    jsonDF.writeStream\n      .format(&quot;console&quot;)\n      .outputMode(&quot;append&quot;)\n      .start()\n      .awaitTermination()     \/\/ \uc785\ub825\uc744 \uacc4\uc18d \uae30\ub2e4\ub9bc<\/code><\/pre>\n<h2>hadoop \uc5d0 \uc800\uc7a5\ud558\uae30<\/h2>\n<pre><code class=\"language-bash\">hadoop fs -rm -r \/streaming\/checkpointLocation\nhadoop fs -rm -r \/streaming\/out\/*\n\nhadoop fs -mkdir -p \/tmp\nhadoop fs -chmod 777 \/tmp\n\nhadoop fs -mkdir -p \/streaming\/out\nhadoop fs -mkdir -p \/streaming\/checkpointLocation<\/code><\/pre>\n<pre><code class=\"language-scala\">    jsonDF.writeStream\n      .format(&quot;com.databricks.spark.csv&quot;)\n      .outputMode(&quot;append&quot;)\n      .option(&quot;path&quot;, &quot;hdfs:\/\/localhost:9000\/streaming\/out&quot;)\n      .option(&quot;checkpointLocation&quot;, &quot;hdfs:\/\/localhost:9000\/streaming\/checkpointLocation&quot;)\n      .start()\n      .awaitTermination()<\/code><\/pre>\n<h2>hadoop \uc5d0 parquet(\ud30c\ucf00\uc774) \ud3ec\uba67\uc73c\ub85c \uc800\uc7a5\ud558\uae30<\/h2>\n<pre><code class=\"language-bash\">hadoop fs -rm -r \/streaming\/checkpointLocation\nhadoop fs -rm -r \/streaming\/out\/*\n\nhadoop fs -mkdir -p \/streaming\/out\nhadoop fs -mkdir -p \/streaming\/checkpointLocation<\/code><\/pre>\n<pre><code class=\"language-scala\">    jsonDF.writeStream\n      .format(&quot;parquet&quot;)\n      .outputMode(&quot;append&quot;)\n      .option(&quot;path&quot;, &quot;hdfs:\/\/localhost:9000\/streaming\/out&quot;)\n      .option(&quot;checkpointLocation&quot;, &quot;hdfs:\/\/localhost:9000\/streaming\/checkpointLocation&quot;)\n      .start()\n      .awaitTermination()<\/code><\/pre>\n<h2>hive \uc5d0\uc11c \ub370\uc774\ud0c0 \uc77d\uae30<\/h2>\n<pre><code class=\"language-bash\">hive\n\nhive (default)&gt; CREATE EXTERNAL TABLE tbl_weblog (\ndt string,\nmsg string\n)\nSTORED AS PARQUET\nLOCATION &quot;hdfs:\/\/localhost:9000\/streaming\/out&quot;;\n\nhive (default)&gt; select * from tbl_weblog;\n\nhive (default)&gt; select * from tbl_weblog order by dt desc limit 10;<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Spark \uc5d0\uc11c Kafka Topic \uc77d\uc5b4\uc624\uae30 \ud50c\ub7ec\uadf8\uc778 \uc124\uce58 IntelliJ \uc5d0\uc11c Scala, Big Data Tools \uc744 \uc124\uce58\ud55c\ub2e4. Scala \ubc84\uc804 \ud655\uc778 spark-shell Using Spark&#039;s default log4j profile: org\/apache\/spark\/log4j-defaults.properties Setting default log level to &quot;WARN&quot;. To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22\/04\/16 09:45:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform&#8230; using builtin-java\u2026 <span class=\"read-more\"><a href=\"https:\/\/www.skyer9.pe.kr\/wordpress\/?p=5270\">Read More &raquo;<\/a><\/span><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[10],"tags":[],"class_list":["post-5270","post","type-post","status-publish","format-standard","hentry","category-kafka"],"_links":{"self":[{"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/5270","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=5270"}],"version-history":[{"count":19,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/5270\/revisions"}],"predecessor-version":[{"id":5309,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/5270\/revisions\/5309"}],"wp:attachment":[{"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=5270"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=5270"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.skyer9.pe.kr\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=5270"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}