Spark 에서 Kafka Topic 읽어오기

By | 2022년 4월 16일
Table of Content

Spark 에서 Kafka Topic 읽어오기

플러그인 설치

IntelliJ 에서 Scala, Big Data Tools 을 설치한다.

Scala 버전 확인

spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/16 09:45:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://ip-172-31-18-128.ap-northeast-2.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1650102324143).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :quit

프로젝트 생성하기

Scala 를 선택하고, JDK 버전은 8 로 선택한다.
확인한 Scala 버전을 설정해 준다.

추가설정하기

여기 를 참조해 설정해 준다.

프로젝트를 실행하면, jar 생성 후, 서버에 복사해 주고,
Spark-Submit 까지 자동으로 해준다.

우선은 정상실행만 확인해 본다.

/home/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local --deploy-mode client --name Unnamed file://$HOME/ReadTopic.jar

의존성 추가

build.sbt 에 의존성을 추가해 준다.

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.15"

val sparkVersion = "3.2.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion
)

lazy val root = (project in file("."))
  .settings(
    name := "ReadTopic"
  )

Scala class 수정

Main.scala 클래스를 수정해 준다.

import org.apache.spark.sql.SparkSession

object ReadTopic {
  def main(args: Array[String]) {

    val kafkaBrokers = "test.skyer9.pe.kr:9092"
    val kafkaTopic = "exam"

    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("Test")
      .getOrCreate()

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "earliest") // From starting
      .load()

    df.printSchema()
  }
}

패키지를 인식하지 못하면 프로젝트를 닫았다 다시 열어준다.

추가설정하기

패키지 org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 가 추가되어야 한다.

/home/spark/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local --deploy-mode client --name Remote --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 file://$HOME/ReadTopic.jar

프로젝트 실행하기

key-value 형식으로 데이타가 수신된다.

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

화면에 출력하기

IntelliJ 콘솔에서 한글이 깨지는 것은 신경쓸 필요없다.
(IntelliJ 문제)
서버에서 직접 실행해 보면 정상적으로 출력된다.

    val kafkaBrokers = "test.skyer9.pe.kr:9092"
    val kafkaTopic = "json_topic"

    val spark = SparkSession.builder()
      .master("local[1]")
      .appName("Test")
      .getOrCreate()

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBrokers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "earliest") // From starting
      .load()

    val jsonStringDF = df.selectExpr("CAST(value AS STRING)")

    val schema = new StructType()
      .add("dt",StringType)
      .add("msg",StringType)

    val jsonDF = jsonStringDF.select(from_json(col("value"), schema).as("data"))
      .select("data.*")

    jsonDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()     // 입력을 계속 기다림

hadoop 에 저장하기

hadoop fs -rm -r /streaming/checkpointLocation
hadoop fs -rm -r /streaming/out/*

hadoop fs -mkdir -p /tmp
hadoop fs -chmod 777 /tmp

hadoop fs -mkdir -p /streaming/out
hadoop fs -mkdir -p /streaming/checkpointLocation
    jsonDF.writeStream
      .format("com.databricks.spark.csv")
      .outputMode("append")
      .option("path", "hdfs://localhost:9000/streaming/out")
      .option("checkpointLocation", "hdfs://localhost:9000/streaming/checkpointLocation")
      .start()
      .awaitTermination()

hadoop 에 parquet(파케이) 포멧으로 저장하기

hadoop fs -rm -r /streaming/checkpointLocation
hadoop fs -rm -r /streaming/out/*

hadoop fs -mkdir -p /streaming/out
hadoop fs -mkdir -p /streaming/checkpointLocation
    jsonDF.writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", "hdfs://localhost:9000/streaming/out")
      .option("checkpointLocation", "hdfs://localhost:9000/streaming/checkpointLocation")
      .start()
      .awaitTermination()

hive 에서 데이타 읽기

hive

hive (default)> CREATE EXTERNAL TABLE tbl_weblog (
dt string,
msg string
)
STORED AS PARQUET
LOCATION "hdfs://localhost:9000/streaming/out";

hive (default)> select * from tbl_weblog;

hive (default)> select * from tbl_weblog order by dt desc limit 10;

답글 남기기