Перейти к основному содержанию
Перейти к основному содержанию

Интеграция Amazon Glue с ClickHouse и Spark

ClickHouse Supported

Amazon Glue — это полностью управляемый бессерверный сервис для интеграции данных от Amazon Web Services (AWS). Он упрощает процесс обнаружения, подготовки и преобразования данных для аналитики, машинного обучения и разработки приложений.

Установка

Чтобы интегрировать ваш код Glue с ClickHouse, вы можете использовать наш официальный коннектор Spark в Glue одним из следующих способов:

  • Установить коннектор ClickHouse Glue из AWS Marketplace (рекомендуется).
  • Вручную добавить JAR‑файлы Spark Connector в ваше задание Glue.
  1. Подпишитесь на коннектор

    Чтобы получить доступ к коннектору в вашей учётной записи, оформите подписку на ClickHouse AWS Glue Connector в AWS Marketplace.

  2. Предоставьте необходимые разрешения

    Убедитесь, что роль IAM вашего задания Glue имеет необходимые разрешения, как описано в руководстве по минимальным привилегиям здесь.

  3. Активируйте коннектор и создайте подключение

    Вы можете активировать коннектор и создать подключение напрямую, нажав эту ссылку, которая откроет страницу создания подключения Glue с заранее заполненными ключевыми полями. Задайте подключению имя и нажмите кнопку Create (на этом этапе нет необходимости указывать параметры подключения к ClickHouse).

  4. Использование в задании Glue

    В вашем задании Glue выберите вкладку Job details и разверните окно Advanced properties. В разделе Connections выберите только что созданное подключение. Коннектор автоматически добавит необходимые JAR‑файлы в среду выполнения задания.

Конфигурация подключений Glue Notebook
Примечание

JAR‑файлы, используемые в коннекторе Glue, собраны для Spark 3.3, Scala 2 и Python 3. Убедитесь, что вы выбираете эти версии при настройке вашего задания Glue.

Примеры

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // для ClickHouse Cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Запись в ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Чтение из ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

Подробности см. в нашей документации по Spark.