๐ฅ
[Spark] spark์์ s3 ์ ๊ทผํ๊ธฐ (ACCESS_KEY, SECRET_KEY) ๋ณธ๋ฌธ
[Spark] spark์์ s3 ์ ๊ทผํ๊ธฐ (ACCESS_KEY, SECRET_KEY)
•8• 2023. 12. 19. 18:142.4.4 ์ดํ์ 2.4.5 ์ด์ ๋ฒ์ ์์ ํ๋ก configuration ์ค์ ํ๋๊ฒ ์ข ๋ค๋ฅธ ๋ฏ ํ๋ค.
Spark ๋ฒ์ 2.4.4 ์ดํ
spark = SparkSession.builder.appName("myapp") \
.config("some.config", "some.value") \
.getOrCreate()
# signature V4 ๋ฅผ ์ฌ์ฉํ๋ ๋ฆฌ์ ์ผ ๊ฒฝ์ฐ ์๋ property ์ค์
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# s3 ์ ๋ณด ์ค์
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.s3a.bucket.{mybucket}.endpoint", my_s3_url)
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.s3a.bucket.{mybucket}.access.key", my_access_key)
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.s3a.bucket.{mybucket}.secret.key", secret_key)
2.4.4 ์ดํ๋ ์คํํฌ์์ ํ๋ก configuration์ ์ ๊ทผํ๋ ค๋ฉด _jsc object (java spark context) ๋ฅผ ํตํด์ ํด์ผํ๋ค.
์ค๊ฐ์ fs.s3a.bucekt.๋ฒํท๋ช . ... ์ด๋ฐ์์ผ๋ก ํค๊ฐ์ ์ค ์ด์ ๋ ์๋ก ๋ค๋ฅธ ํค๋ฅผ ๊ฐ์ง ์ฌ๋ฌ๊ฐ์ ๋ฒํท์ ์ ๊ทผํ๊ธฐ ์ํจ์ด๋ค.
Spark ๋ฒ์ 2.4.5 ์ด์
conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.endpoint', my_s3_url)
conf.set('spark.hadoop.fs.s3a.access.key', my_access_key)
conf.set('spark.hadoop.fs.s3a.secret.key', my_secret_key
spark = SparkSession.builder.appName("myapp") \
.config("some.config", "some.value") \
.getOrCreate()
2.4.5๋ถํฐ๋ hadoop configure๋ฅผ ์ค์ ํ๊ธฐ ์ํด _jsc ๋์ ๊ทธ๋ฅ `SparkConf.set()`์ ์ฌ์ฉํด์ ์ค์ ํ ์ ์๋ค.
(์ฐธ๊ณ : https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration)
[์ฐธ๊ณ ] ACCESS_KEY, SECRET_KEY ๊ฐ์ ธ์ค๋ ๋ฐฉ๋ฒ
์คํฌ๋ฆฝํธ์ ํค ๊ฐ์ ๋ฐ์๋์ ์๋ ์์ด ์์นํด๋ณด๋ boto3 ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๊ฑฐ๋ spark config์ ๋ฃ์ด์ฃผ๋ ๋ฏ ํ๋ค.
boto3 ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ
์ ํญ๋ชฉ์ค์ ํ๋๋ก ์๊ฒฉ์ฆ๋ช ์ ๊ด๋ฆฌํ๊ณ ์๋ค๋ฉด ์๋์ ๊ฐ์ด boto ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
import boto3
# default profile์ธ ๊ฒฝ์ฐ profile_name ์๋ต ๊ฐ๋ฅ
boto_session = boto3.Session(profile_name='my-profile')
credentials = boto_session.get_credentials()
access_key = credentials.access_key
secret_key = credentials.secret_key
์ด ๋ฐฉ๋ฒ์ cluster deploy mode ์์๋ ์ฌ์ฉํ์ง ๋ชปํ ๊ฒ ๊ฐ๋ค.
spark-defaults.conf ์ ์ค์
1ํ์ฑ ์คํฌ๋ฆฝํธ์ ๋งค๋ฒ boto3์ ์ด์ฉํด access key์ secret key๋ฅผ ๋ฃ๋๊ฒ 4์ค์ด์ง๋ง ๊ท์ฐฎ์ ์ ์๋ค.
๊ทธ๋ด ๋๋ spark ์ค์น ๊ฒฝ๋ก์ ์๋ spark-defaults.conf ์ค์ ํ์ผ์ ์์ ํ๋ฉด ๋ ๋ฏ ํ๋ค.
ํ์ผ์ ์ด์ด๋ณด๋ฉด "spark." ์ผ๋ก ์์ํ๋ ์ฌ๋ฌ๊ฐ์ง ์ค์ ๊ฐ๋ค์ด ์๋๋ฐ ์์์ ์คํํฌ ์ฝ๋ ๋ด์์ ์ค์ ํ๋ spark.hadoop.fs.s3a.access.key ๊ฐ์ ๊ฐ๋ค์ ์ฌ๊ธฐ์ ์ง์ด๋ฃ์ด์ฃผ๋ฉด ๋๋ค.
* ์ถ๊ฐ์ฐธ๊ณ !
pyspark๊ฐ submit๋๊ณ sparkcontext intialize ์์ ์ sparkContext๋ Py4J๋ฅผ ํตํด java gateway๋ฅผ ์ด๊ธฐํ ํจ (JVM์ ์ฐ๊ฒฐ)
`sc._jvm`: JVM์ ๊ฒ์ดํธ์จ์ด
`sc._jsc`: ํด๋น JVM์ spark context์ ๋ํ ํ๋ก์
์ผ๋ฐ์ ์ผ๋ก `_jsc` ๋ฉค๋ฒ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๋น์ถ๋ผ๊ณ ํ๋ค.
์ถ๊ฐ๋ก python์ผ๋ก ์ ์๋ udf์ ๊ฒฝ์ฐ jvm ๊ฐ์ฒด๋ง์ผ๋ก๋ ์คํ๋ ์๊ฐ ์์ด์ python subprocess๋ฅผ ์คํํจ -> ๋ฐ์ดํฐ ์ ์ก ๋ถํ๊ฐ ์์ด ๋ณ๋ชฉํ์ ๋ฐ์
์ต๋ํ dataframeapi๋ก๋ง ์ฝ๋๋ฅผ ์์ฑํ๋๊ฒ ์ข์
์ด๋ถ๋ถ์ ๋์ค์ ์ถ๊ฐ ์ ๋ฆฌ ํ๊ธฐ
'๋ฐ์ดํฐ > Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] GraphX (0) | 2024.03.18 |
---|---|
[Spark] Spark Join ์ข ๋ฅ (0) | 2024.03.18 |
[Spark] s3 ๋ฐ์ดํฐ dataframe์ผ๋ก ๋ก๋ํ๊ธฐ (0) | 2023.06.01 |
[Spark] TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again. ์ค๋ฅ ๋ฐ์ ์ ํด๊ฒฐ ๋ฐฉ๋ฒ (0) | 2023.05.22 |
[Spark] CSV ํ์ผ ๋ก๋ํ๊ธฐ (0) | 2023.04.25 |