flink大数据15天速成教程

张开发
2026/4/12 21:27:27 15 分钟阅读

分享文章

flink大数据15天速成教程
一、环境部署与启动🛑 准备工作(第 0 天):选对版本,事半功倍版本选择:不要追最新(如 1.19),也不要太旧(如 1.12)。推荐:Flink 1.17 或 1.18(LTS 长期支持版,稳定,社区连接器支持好)。JDK:Java 8 或 Java 11(推荐 11,性能更好)。IDE:IntelliJ IDEA(必须)。环境:本地开发用LocalExecutionEnvironment,依赖组件(Kafka, MySQL, Redis)用Docker一键启动(避免装环境浪费时间)1.1、flink1.18下载https://mirrors.aliyun.com/apache/flink/flink-1.18.1/1.2、JDK11下载国内下载地址,免登录Index of java-local/jdk/11.0.2+7java jdk 国内下载镜像地址1)TUNA镜像https://mirrors.tuna.tsinghua.edu.cn/Adoptium/2)HUAWEI镜像https://repo.huaweicloud.com/java/jdk/3)injdkhttps://d.injdk.cn/download/openjdk其他地址:1) https://openjdk.java.net2) https://jdk.java.net/archive/3)https://github.com/openjdk/jdk等待安装结束, 配置环境变量(此处省略)window + s 快捷键,输入env,进入环境变量配置变量名:JAVA_HOME变量值:C:\Program Files\Java\jdk-11.0.2(JDK的安装路径,这里以你自己的安装路径为准)新建CLASSPATH变量,变量值为:.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar(注意前面是有一个点的),配置好之后如下图,这里是可以复制粘贴的点击确定配置path,找到path,双击或者点编辑,最后一行添加输入%JAVA_HOME%\bin点击确定windows + R, 输入cmdJAVA --version,打印JAVA版本号说明配置环境成功1.3、maven下载安装国内下载地址把下载好的apache-maven-xxx-bin.zip解压到一个不带中文、不带空格的路径D:\Program Files\apache-maven-3.8.9目录结构说明:bin:包含 mvn 命令 conf:配置文件所在目录 lib:依赖 jar 包配置环境变量(关键步骤)1. 新建 MAVEN_HOME此电脑 → 右键属性 → 高级系统设置 → 环境变量系统变量 → 新建变量名:MAVEN_HOME变量值:你的 Maven 路径,如D:\Program Files\apache-maven-3.8.92. 配置 Path找到系统变量里的 Path → 编辑新建:%MAVEN_HOME%\bin验证是否安装成功打开 cmd,输入mvn -v如果出现版本信息,说明安装成功。配置阿里云镜像(解决下载慢)Maven 官方仓库在国外,下载极慢,必须换成阿里云镜像。打开 conf/settings.xml,找到 mirrors 标签,添加:mirrors mirror idaliyunmaven/id mirrorOf*/mirrorOf name阿里云公共仓库/name urlhttps://maven.aliyun.com/repository/public/url /mirror /mirrors1.4、下载IDEAidea配置maven环境和java环境二、学习进度15 天速成的核心策略是:不求甚解原理,只求跑通场景。不要一开始就去读源码或钻研底层分布式理论,以“构建一个能跑的数据管道”为目标。📅 15 天学习路线图第一阶段:跑通第一个作业,建立信心(Day 1 - Day 3)目标:不纠结概念,先让代码跑起来,看到数据流动。Day 1: 跑通一个demo第一步:新建 Maven 项目打开 IDEA,点击New Project。左侧选择Maven。JDK:选择你安装的 JDK(建议JDK 1.8或JDK 11,Flink 1.18 都支持,如果你不确定,选 1.8 最稳妥)。点击Next,填写项目名(例如flink-pro)。点击Finish。第二步:修改pom.xml(核心步骤)打开项目根目录下的pom.xml,删除里面所有内容,直接复制下面的代码进去。我已经帮你配好了 Flink 核心依赖、日志依赖和阿里云镜像(下载更快)。?xml version="1.0" encoding="UTF-8"? project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" modelVersion4.0.0/modelVersion groupIdcom.juxin/groupId artifactIdflink_pra/artifactId version1.0-SNAPSHOT/version properties !-- 修正这里:必须是 Java 版本,不能是插件版本 -- maven.compiler.source11/maven.compiler.source maven.compiler.target11/maven.compiler.target project.build.sourceEncodingUTF-8/project.build.sourceEncoding flink.version1.18.1/flink.version java.version11/java.version /properties dependencies !-- 1. Flink 核心依赖 -- dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java/artifactId version${flink.version}/version /dependency !-- 2. Flink 客户端依赖 -- dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version /dependency !-- 3. 日志依赖 (Flink 1.18 推荐组合) -- dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version1.7.36/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-core/artifactId version2.17.1/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-api/artifactId version2.17.1/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-slf4j-impl/artifactId version2.17.1/version /dependency /dependencies build plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version3.8.0/version configuration source${java.version}/source target${java.version}/target /configuration /plugin /plugins /build /project等待安装结束新建一个测试demopackage com.juxin.day01; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; public class SimpleFlinkJob { public static void main(String[] args) throws Exception { // 1. 创建执行环境 (本地模式) // 这一步相当于启动了"迷你集群" StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为 1,方便新手看日志,不然日志会乱序 env.setParallelism(1); // 2. 创建数据源 (模拟数据:1, 2, 3, 4, 5) DataStreamInteger dataStream = env.fromElements(1, 2, 3, 4, 5); // 3. 简单算子处理 (每个数字乘以 10) DataStreamInteger resultStream = dataStream.map(value - { // 这里可以打断点调试 System.out.println("正在处理数据:" + value); return value * 10; }); // 4. 打印结果到控制台 resultStream.print(); // 5. 提交执行 // 程序会卡在这里,直到任务结束或被手动停止 env.execute("我的第一个 Flink 任务"); } }运行,日志打印如下Day 2: 消费 Kafka消息2.1、pom.xml里面添加依赖!-- Kafka -- dependency groupIdorg.apache.flink/groupId artifactIdflink-connector-kafka/artifactId version1.17.1/version /dependency2.2、docker安装kafka2.2.1:创建网络docker network create kafka-net如果提示网络已存在,可以忽略,或者先删除:docker network rm kafka-net2.2.2、启动 Zookeeperdocker run -d \ --name zookeeper \ --network kafka-net \ -p 2181:2181 \ -e ZOOKEEPER_CLIENT_PORT=2181 \ -e ZOOKEEPER_TICK_TIME=2000 \ swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/zookeeper:latest等 10 秒,让 Zookeeper 完全启动。2.2.3、启动 Kafkastrimzi/kafka= Kubernetes 专用镜像 →不支持 docker 环境变量启动bitnami/kafka= Docker 标准镜像 →完美支持 KAFKA_环境变量*docker run -d --name kafka --network kafka-net -p 9092:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.50.3.101:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.7注意:Kafka broker 启动时会注册自己的advertised.listeners地址,并将这个地址返回给客户端KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.50.3.101:9092 这里的10.50.3.101就是kafka所在服务器的ip地址✅ 验证启动成功# 查看容器状态 docker ps # 应该看到两个 Up 状态的容器# 进入 Kafka 容器 docker exec -it kafka bash # 1. 查看 Broker 列表(能通就说明服务活了) kafka-broker-api-versions.sh --bootstrap-server localhost:90922.2.4、消费和生产消息2.2.4.1、创建 Topic 命令kafka-topics.sh --create \ --topic my-topic \ --bootstrap-server localhost:9092 \ --partitions 1 \ --replication-factor 1说明:my-topic:主题名--partitions 1:分区数(单节点写 1)--replication-factor 1:副本数(单节点写 1)2.2.4.2、消费消息命令(2 种最常用)① 从最新消息开始消费kafka-console-consumer.sh \ --topic my-topic \ --bootstrap-server localhost:9092② 从头消费(能看到历史所有消息)kafka-console-consumer.sh \ --topic my-topic \ --bootstrap-server localhost:9092 \ --from-beginning

更多文章