glimpse of saga

项目中遇到多个微服务调用需要考虑和处理某个环节失败时的处理。虽然这里不需要很强的事务概念,但是需要对失败的动作进行重试等操作。这里的重试本质上就是 rollback 的另一种形式,在 saga 里算是“forward recovery”。
借机又翻看了一下相关的文章,贴到了文末。

Saga vs TCC

  1. Saga 相比 TCC 的缺点是缺少预留动作,所以某些情况补偿的实现比较麻烦甚至无法撤销只能补救。不过没有预留动作也意味着不必担心资源释放的问题。
  2. TCC 最少通信次数为 2n,Saga 为 n(n=sub-transaction 的数量)。
  3. 第三方服务没需要提供有 Try 接口。
    总体感觉下来 SAGA 更适合微服务的多数场景。

Simple Saga

解决这类问题当然可以直接引入一些已存在的 saga 框架,不过这里存在学习、部署等成本。如果只是小范围的解决问题,或许可以使用下面的形式。
示意图
上面示意图针对的场景是:服务的执行都需要较长时间、并且是异步调用。
如果各个服务执行时间都不长,一个调用链下来小于几百毫秒,那么直接使用 reactive style 的编码也应该可以。
因为各服务执行时间较长,所以不能使用同步调用。这里耗时指的是对于有 UI 的程序至少影响到到 UI 前的用户,如果是后台应用那么至少阻塞的时长影响到系统的资源可用性。
即使服务执行时间短,同步调用也会使调用链的 availability 降低,所以微服务的场景下使用异步调用有天然的好处。

从这个示意图其实可以看作是 Chris 演讲中提到的最最原始的模式。可以把 callback 看作是 saga 事务参与方发送消息到 message broker。而调用链的第一个节点就充当了 saga 的协调者。
各个微服务的 updateStatus 端点就是 message 的 listner,只不过这里直接通过 callback 实现而没有利用消息队列。
最开始的 endpoint 负责生成一个 transactionId 并依次传递给每个下游服务,每个下游服务通过 callback 把自己的状态更新给上游。

  1. getStatus() 端点提供给 UI 获取当前状态。
    UI上的状态显示

  2. transCheckAndAmend(trans_n) 每个服务暴露的业务方法都需要提供一个补偿方法。
    Compensating transactions

  3. 服务的入口方法其实充当了协调者, 更像 orchestration 的,而不是 choreography 的。

  4. Timer 是个后台定时器不停的检查服务状态,如果状态不成功就调用 compensating endpoint.

Reference:
[1]: Saga 的经典论文 https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf
[2]: 《Microservice Pattern》”Chapter4, Managing transactions with sagas”
[3]: Chris Richardson 在 2017 年的演讲:https://www.youtube.com/watch?v=YPbGW3Fnmbc

一些中文网文:
[3] 分布式事务:Saga 模式 https://www.jianshu.com/p/e4b662407c66
[4] 七种分布式事务的解决方案 https://cloud.tencent.com/developer/article/1806989
[5] 分布式事务六种解决方案 https://zhuanlan.zhihu.com/p/183753774

响应式并发批处理

假设 DataProcessor 接口定义了方法 batchProcess 能够对一批数据进行处理,一批处理 500 个数据。现在我们需要对一个响应式数据流 Flux dataItems 调用 batchProcess() 进行处理。

public interface DataProcessor {
  Mono<String> batchProcess(List<DataItem> dataItems);
  ... ...
}


DataProcessor dataProcessor = ...;

int batchSize = 500;

Flux<DataItem> dataItems = ...

下面分别以串行和并行的方式展示一下 Reactor API 的使用。

1)攒够 batchSize 个数据后进行处理。

Mono<List<String>> result = dataItems.buffer(batchSize)
    .flatMap(dataProcessor::batchProcess)
    .collectList();

2)以并行的方式,把流分成 10 股,每股攒够 batchSize 个数据后进行处理。

Mono<List<String>> result = dataItems
    .parallel(10)
    .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(10)))
    .groups()
    .flatMap(g -> g.buffer(batchSize).flatMap(dataProcessor::batchProcess))
    .collectList();

这里 runOn 接收的参数可以是 Schedulers 不同策略的实现,具有不同适用范围,比如适合计算密集型的 ParallelScheduler、单线程的 SingleScheduler。这里使用的是 Executors FixedThreadPool。

可以想象如果我们自己实现这样一个处理逻辑的复杂度,而通过 reactor api,仅仅几行代码就完成了这么复杂高效的处理。

3)使用 reactive mongo driver 需要的线程。

Spring 默认到 monog 的链接池最大为 100,但是实际上在使用 reactive 方式访问时使用 20 ~ 10 个左右的线程就足够了。因此对 mongog 的连接串最好明确使用适合自己情况的连接数以避免连接浪费或不够。
测试了一个 70 万条、大概 250M 数据的批量插入,发现无论使用串行还是并行,数据库插入时间都差不多(36s ~ 26s)。而连接池最大连接设为 200、100、50、20、10 对数据库插入的性能也没有太大影响,200 个线程时反而有一点下降。这个情况从 mongo 响应式驱动的角度去解释是完全可以理解的,如果使用传统驱动,恐怕所需的线程就不是这个量级的了。

使用Oracle 12c容器服务创建数据库

1) 启动Oracle服务容器

docker run -d --name oracle -p1521:1521 -v OracleDBData:/ORCL store/oracle/database-enterprise:12.2.0.1-slim

参考:https://hub.docker.com/u/dhyuan/content/sub-2794a3fe-cd52-4dcb-bf8e-3fda40f02a83

通过docker inspect oracle 可以看到被映射在 /var/lib/docker/volumes/OracleDBData

"Mounts": [
    {
        "Type": "volume",
        "Name": "OracleDBData",
        "Source": "/var/lib/docker/volumes/OracleDBData/_data",
        "Destination": "/ORCL",
        "Driver": "local",
        "Mode": "z",
        "RW": true,
        "Propagation": ""
    }
],

2)进入Oracle,执行sqlplus。

oracle 12c 是多租户数据库,有个核心概念CDB,PDB需要理解。进入容器oracle:
参考: https://docs.oracle.com/database/121/CNCPT/cdbovrvw.htm#CNCPT89236

docker exec -it oracle bash

sqlplus / as sysdba

show user
show con_name
show pdbs

查看当前用户连接到的数据库是CDB还是PDB。

SELECT NAME, CDB, CON_ID FROM V$DATABASE;

3)创建PDB

创建PDB之前需要设置 FILE_NAME_CONVERT 或者 db_create_file_dest
参考:https://smarttechways.com/2019/03/05/ora-65016-file_name_convert-must-be-specified/
https://segmentfault.com/a/1190000038344836
https://dba.stackexchange.com/questions/190451/unable-to-connect-to-pluggable-database-in-oracle-12c

检查PDB$SEED database文件的位置:

SQL> alter session set container=PDB$SEED;

Session altered.

SQL> select FILE_NAME from dba_data_files;

FILE_NAME
--------------------------------------------------------------------------------
/u02/app/oracle/oradata/ORCL/pdbseed/system01.dbf
/u02/app/oracle/oradata/ORCL/pdbseed/sysaux01.dbf
/u02/app/oracle/oradata/ORCL/pdbseed/xdb01.dbf

SQL>

因为创建PDB时需要连接在CDB$ROOT,所以先检查当前连接。如果不是,则切换到CDB。

SQL> select sys_context ('USERENV', 'CON_NAME') from dual;

SYS_CONTEXT('USERENV','CON_NAME')
--------------------------------------------------------------------------------
PDB$SEED

SQL>  alter session set container = CDB$ROOT;

Session altered.

SQL> select sys_context ('USERENV', 'CON_NAME') from dual;

SYS_CONTEXT('USERENV','CON_NAME')
--------------------------------------------------------------------------------
CDB$ROOT

创建一个PDB: YDPDB 用户:ydhbqb

    SQL> CREATE PLUGGABLE DATABASE YDPDB ADMIN USER ydhbqb IDENTIFIED BY ydhbqb0531 FILE_NAME_CONVERT=('/u02/app/oracle/oradata/ORCL/pdbseed/','/u02/app/oracle/oradata/ORCL/pdbseed/YDPDB/');

赋予sysdba权限:

SQL>  grant SYSDBA to ydhbqb;

Grant succeeded.
   

Pluggable database created.

SQL> show pdbs;

    CON_ID CON_NAME              OPEN MODE  RESTRICTED
---------- ------------------------------ ---------- ----------
    2 PDB$SEED              READ ONLY  NO
    3 ORCLPDB1              READ WRITE NO
    4 YDPDB              MOUNTED
SQL> 

SQL> alter pluggable database YDPDB open;

Pluggable database altered.

SQL> show pdbs;

    CON_ID CON_NAME              OPEN MODE  RESTRICTED
---------- ------------------------------ ---------- ----------
    2 PDB$SEED              READ ONLY  NO
    3 ORCLPDB1              READ WRITE NO
    4 YDPDB                  READ WRITE NO
SQL> 

让Oracle记住这个状态:

alter pluggable database YDPDB save state;

关闭PDB

alter pluggable database YDPDBclose immediate;

查看错误信息

select * from PDB_PLUG_IN_VIOLATIONS;

4) 连接到PDB。

1) 用sqlplus连接到PDB。

sqlplus ydhbqb/ydhbqb0531@localhost:1521/ydpdb.localdomain

2) Spring DS

spring:
    datasource:
        url:  jdbc:oracle:thin:@localhost:1521/ydpdb.localdomain
        username: ydhbqb
        password: ydhbqb0531

通过Ansible在Ubuntu KVM Host上安装VM和Docker

0) 前提

Ubuntu HOST 已经安装了 KVM 环境并工作良好。

1) 在 Ubuntu KVM HOST 上安装 ansible

# Ansible
sudo apt install -y software-properties-common
sudo apt-add-repository --yes --update ppa:ansible/ansible
sudo apt install -y ansible

2) 安装其它相关工具包

sudo apt install -y --no-install-recommends \
dnsmasq \
git \
genisoimage \
libguestfs-tools \
libosinfo-bin \
python3-libvirt \
python3-lxml \
qemu-utils \
virtinst

3) checkout virt-infra 代码库

mkdir ~/devenv_bootstrap

git clone https://github.com/dhyuan/virt-infra-ansible.git
git check -b devenv origin/devenv

cd roles
git clone https://github.com/dhyuan/ansible-role-virt-infra.git
git check -b devenv orign.devenv

4) 确保宿主机上有个网桥名与客户机定义文件所用网络名一致。

比如 inventory/ubuntu20Server.yml 中我们定义了网桥网络使用 br0。

virt_infra_networks:
        - name: br0

5) 使用 ansible 在 ubuntu 宿主机上安装客户机

http://cloud-images.ubuntu.com/focal/current/focal-server-cloudimg-amd64.img 下载镜像保持到 host 的 /var/lib/libvirt/images 目录。确保 kvmhost 上的/etc/hosts 中没有定义对 ubuntu20Server 的 ip 映射。

apt install -y ansible

ansible-playbook --limit kvmhost,ubuntu20Server ./virt-infra.yml

删除安装的虚拟机:

ansible-playbook ./virt-infra.yml --limit kvmhost,ubuntu20Server --extra-vars virt_infra_state=undefined

6) 创建完毕,通过 virt-manager 启动客户机。

测试网络的连通性,在 kvmhost 上 ping ubuntu20Server。有问题可以通过 ip 命令查看,进行必要的网卡启动等操作、检查/etc/netplan/….yml 文件、执行 netplan apply 等操作。

7) 下载在 VM 安装 Docker 的脚本,并安装

在 kvmhost 上执行:

ansible-galaxy install \
--roles-path ~/.ansible/roles/ \
git+https://github.com/haxorof/ansible-role-docker-ce.git,3.3.2

#
ansible-galaxy list

~/devenv_bootstrap/virt-infra-ansible/install_docker.yml 里定义了 ansbile 所用配置。执行以下语句,将在第五步创建的 VM ubuntu20Server 中安装 Docker 20.10.4。

ansible-playbook ./install_docker.yml --limit ubuntu20Server -v --extra-vars docker_version=5:20.10.4~3-0~ubuntu-focal

可以通过 docker_users 参数指定加入 docker group 的用户,使用 ali 镜像下载 docker 安装包:

ansible-playbook ./install_docker.yml \
--limit ubuntu20Server -v \
--extra-vars docker_version=5:20.10.4~3-0~ubuntu-focal \
--extra-vars '{"docker_users": ["devops"] }' \
-e '{"docker_repository_url": {"Ubuntu20": "http://mirrors.aliyun.com/docker-ce/linux/ubuntu/dists/focal/Release"}}'

在Ubuntu20上创建桥接网络和虚拟机

1) 网络管理相关命令

之前常用的包含在 net-tools 工具包里的命令 ifconfig、netstat 已经被 iproute2 工具包里的 ip 等命令所取代。
所以以后在非遗留系统,就没必要再使用不被维护的老命令了。

ip link show
ip link show eth0
ip link set eth0 up|down

ip addr show
ip addr show eth0
ip add add|del 192.168.0.99/24 eth0

ip route
ip neigh

ss -l
ss -a

lsof -i // -i 网络 -u 用户  -p 进程ID -c 进程名

2) Ubuntu 网络管理

如果是 Ubuntu Desktop 环境,就不妨使用 NetworkManager 和 nm-connection-editor 这些 GUI 程序或者 nmcli、nmtui 来管理网络及设备。
如果是 Ubuntu Server 环境,还是用 systemd-netword 比较好。
NetworkManager 和 networkd 是有冲突的,选择使用一个之后另一种方式需要被禁用。下面的命令是操作相关服务所用的命令,其中 enable/disable 用于设置是否开机启动而 mask/unmask 则用于设置服务是否可用。选择使用 NetworkManager 之后检查以下 cat /etc/NetworkManager/NetworkManager.conf,确保 managered 被设置为 true。

sudo systemctl stop NetworkManager
sudo systemctl disable NetworkManager
sudo systemctl mask NetworkManager

sudo systemctl unmask systemd-networkd.service
sudo systemctl enable systemd-networkd.service
sudo systemctl start systemd-networkd.service

在 NetworkManager 和 networkd 之上,我们可以使用 netplan 来进行更高层次的控制。参考:https://netplan.io/

Netplan is a utility for easily configuring networking on a linux system. You simply create a YAML description of the required network interfaces and what each should be configured to do. From this description Netplan will generate all the necessary configuration for your chosen renderer tool.

3)创建网桥、虚拟机,设置虚拟机桥接网卡。

实验环境使用 Ubuntu20 Desktop 做宿主机,感觉用 nm-connection-editor 创建网桥、配置网桥出口也很方便、顺利。具体步骤可参考http://www.zrway.com/news/8366.html。

下图是通过 nm-connection-editor 创建出网桥,并把物理网卡插到这个网桥上。注意,把网卡作为网桥的 salver 设备之后,需要通过 GUI 删除这个网络网卡。

创建网桥

通过 virt-manager 创建出虚拟机后,需要设置虚拟机的网卡。这里有个 QEMU 前端设备、后端后端设备(backend network)的概念。所谓前端设备就是 Guest 虚拟机看到的设备,后端设备、后端网络就是在 Host 宿主机上的设备、网络。下图实际上大致对应这样的命令: qemu -net nic,model=e1000 -net bridge,br=bridge1 …

设置网桥

下图是/etc/netplan 目录里的配置文件。

参考:
netplan: https://ubuntu.com/blog/ubuntu-bionic-netplan
图形化管理工具: https://ubuntu.com/core/docs/networkmanager
网络管理服务: https://www.configserverfirewall.com/ubuntu-linux/ubuntu-network-manager/
创建网桥: http://www.zrway.com/news/8366.html

利用docker在本地搭建nginx环境

利用 docker 搭建如下拓扑结构的的实验环境。相关代码可以在 https://github.com/dhyuan/dockerEnv/tree/main/nginx 获得。

Topology

1) 网路

首先创建一个网络’nginx-net’,把所有的相关的容器放在这个网络里以方便容器之间的访问。
这个网络地址范围 172.50.0.0/16。相关脚本 createNetwork.sh。

docker network create --subnet=172.50.0.0/16 nginx-net

2) 创建并运行 Nginx 容器 ‘nginxA’, ‘nginxB’, ‘nginxC’.

并把个容器的 IP 固定下来防止容器重启后 IP 变化以影响日志观察。
把相关配置影射到本地方便修改。相关脚本 startNginx.sh。

下面是配置 nginx 容器 nginxA。

docker run --name nginxA \
--network nginx-net --ip 172.50.0.11 \
-v ~/dockerEnv/nginx/nginxA/html:/usr/share/nginx/html:ro \
-v ~/dockerEnv/nginx/nginxA/config/conf.d:/etc/nginx/conf.d \
-v ~/dockerEnv/nginx/nginxA/config/nginx.conf:/etc/nginx/nginx.conf:ro \
-v ~/dockerEnv/nginx/nginxA/log:/var/log/nginx \
-p 18080:9090 \
-d nginx:1.22.0

Nginx 镜像不包含 ping,curl 这样的常用工具,可以自行安装。

docker exec -it nginxA sh
apt-get update
apt-get install iputils-ping curl

3) 创建一个容器作为 client

因为 nginx 容器都在自己的网络 nginx-net 里,为了方便测试通过创建一个 Alpine 容器作为客户端环境。Alpine 容器以-it 交互模式运行。

docker run --name nclient100 --network nginx-net --ip 172.50.0.100 -it alpine:3.16.0

Alpine 镜像没有 curl 命令,可以通过 apk 安装。

apk add curl

可以在 console 发送 curl 命令进行测试。

curl http://nginxA:9090/testProxy/index.html
curl -H "X-FORWARDED-FOR: 12.3.4.5"  http://nginxA:9090/testProxy/index.html

4) Play around

因为 ngix 容器的配置、日志、html 目录都映射到了本地,所以可以本地的这些内容进行测试、验证。
可以根据自己的需要修改 nginxA(B|C)/config/confd/9090.conf 里的**”location /testProxy”** 来进行相关 nginx 相关参数的验证。

查看各容器的 IP,可运行 ./showInfo.sh
清除各容器的 nginx 日志,可运行 ./cleanlog.sh
修改 nginx 的配置后进行语法检查,可运行 ./testCfg.sh
变更了 nginx 的配置使之生效,可运行 ./reload.sh

5) Demo

下图显示了一个测试获取真实用户 IP 的场景。

为了查看 HTTP header 里的 X_REAL_IP 字段,在‘log_format main’里加上了 “ | $http_x_real_ip” ,这样在 access.log 就就可以看到 nginx 接收到的 HEADER X_REAL_IP 的值。

从图里 nginxC 的 access.log 中可以看到,nginxC 收到的 X_REAL_IP 是 ngixA 的 IP。如果我们希望这里的 X_REAL_IP 记录的是真正 web client 的 IP 就需要把 nginxB 中的指令 “proxy_set_header X-Real-IP $remote_addr;” 注释掉。这样,ngixC中收到的X_REAL_IP就是 nginxA中看到的$remote_addr,及 ngixA 看到的 client 的 IP 了。

X-FORWARDED_FOR_demo

在Jenkins中使用Allure显示测试结果

allure 官网对自己的定义:

Allure Framework is a flexible lightweight multi-language test report tool that not only shows a very concise representation of what have been tested in a neat web report form …

1)首先在开发机器中安装 allure,

参考官方文档 https://docs.qameta.io/allure/#_installing_a_commandline 安装 allure。这里要注意的是如果是通过下载压缩包解压进行的安装,那么需要把 allure 连接到/usr/bin/allsure 这样的系统目录,否则 allure-maven 插件运行时找不到 allure command。如果是 mac 系统,因为权限管理的问题,可以链接到/usr/local/bin/allure。

2) 配置 pom 文件保证 allure 在本地可工作。

配置 maven 项目使用的是哪种测试框架来配置 pom 文件,可参考:https://docs.qameta.io/allure/#_java
运行以下命令确认本地环境可以正常生成 allure report。

mvn clean test allure:report
mvn allure:serve

3)在 jenkins 中安装 allure 插件并配置。

如图,安装 allure plugin
安装allure插件

如图,配置 allure tool,这里选择用到 allure 时自动下载。
配置allure工具

4)修改 Jenkinsfile 添加 allure 相关 step。

allure 相关配置大致如下:

post {
    always {
        junit testResults: "**/target/surefire-reports/*.xml"

        script {
          allure includeProperties: false, jdk: '', results: [[path: 'target/allure-results/']]
        }
    }

    success {
        archiveArtifacts 'target/*.jar'
    }
}

当把对 pom 和 jenkinsfile 的修改 push 到 GitLab 之后,因为之前已经配置了 pipeline 可被 gitlab 事件触发执行。顺利的话,就可在 Jenkins 中看到 allure 生成的报表了。

报表入口:
注意,生成的 allure-report.zip 并不能解压后直接通过浏览器查看,需要通过运行 allure open ./allure-report.zip 来查看。

可视化图表:

Reference:
https://docs.qameta.io/allure/
https://github.com/jenkinsci/allure-plugin

通过Jenkins pipeline build Maven项目

如之前的文章安装 CoreDNS、GitLab、Jenkins 容器 所述熟悉了基本的容器安装之后就可以配置 Jenkins pipeline 构建基于 maven 的 Java 项目了。

1)安装 JDK 及 Maven

不要通过 apt install 的方式安装,而通过下载 zip 解压的方式安装。

mkdir -p /home/devops/dockerSrvStorage/jenkins/tools/
cd /home/devops/dockerSrvStorage/jenkins/tools/
wget https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz
tar xvzf openjdk-11+28_linux-x64_bin.tar.gz
wget https://mirrors.bfsu.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.zip
unzip apache-maven-3.8.1-bin.zip

2)启动 GitLab、Jenkins 容器。

停止并删除旧的 Jenkins, GitLab 容器:

docker stop jenkins gitlab
docker rm jenkins gitlab

Jenkins 和 GitLab 容器的互联互通至少有下面两种方式。一种是通过先定义出一个 network,然后 Jenkins 和 GitLab 加入这个网络来完成,这时容器间的访问是在 docker 自己内部网络上完成的而与宿主机 IP 无关。。一种是通过外部 DNS Server 完成容器间域名字的解析,这时容器间的访问是通过宿主机的 IP+映射到宿主机的端口完成的。

docker 网络内部互联的方式,如果是跨主机容器间通讯可以通过 overlay 完成。(如果跨主机这种方式,就没有理由不上 K8S 了。)下面是用于在同一宿主机下的命令:
首先创建一个网络:

docker network create devopNet

启动 Jenkins 和 GitLab 容器,注意这里通过–net 设定它们的网络都是 devNet

docker run --detach \
--name jenkins \
--net devopNet \
--hostname jenkins.telbox.cn \
--publish 9183:8080 --publish 9184:50000 \
--restart always \
-v /home/devops/dockerSrvStorage/jenkins:/var/jenkins_home \
jenkins/jenkins:2.277.4-lts-jdk11

docker run --detach \
--name gitlab \
--net devopNet \
--hostname gitlab.telbox.cn \
--publish 9181:443 --publish 9180:80 --publish 9182:22 \
--restart always \
-v /home/devops/dockerSrvStorage/gitlab/config:/etc/gitlab \
-v /home/devops/dockerSrvStorage/gitlab/logs:/var/log/gitlab \
-v /home/devops/dockerSrvStorage/gitlab/data:/var/opt/gitlab \
gitlab/gitlab-ce:13.11.3-ce.0

通过 DNS 来通许的容器,相当于容器可以使用传入的 DNS 服务,也就可以通过 DNS 中定义的机器名通讯了。这样 jenkins 容器和 gitlab 容器运行在不同的主机,DNS 解析不同宿主机 IP/域名。这种情形, Jenkins 容器 ssh 到非 22 的 GitLab 上(假设 gitlab 容器映射 22 端口到其宿主机的 9182 端口),那么就需要设置 Jenkins 里 ssh config 文件。比如下面的/home/devops/dockerSrvStorage/jenkins/.ssh/config 这样:

Host gitlab.telbox.cn
    Hostname gitlab.telbox.cn
    Port 9182
    User dhyuan

这样在 jenkins 容器的 ssh 才能默认连接到 gitlab 容器宿主机的 9182 端口。

docker run --detach \
--name jenkins \
--dns 192.168.0.114 \
--hostname jenkins.telbox.cn \
--publish 9183:8080 --publish 9184:50000 \
--restart always \
-v /home/devops/dockerSrvStorage/jenkins:/var/jenkins_home \
jenkins/jenkins:2.277.4-lts-jdk11

docker run --detach \
--name gitlab \
--dns 192.168.0.114 \
--hostname gitlab.telbox.cn \
--publish 9181:443 --publish 9180:80 --publish 9182:22 \
--restart always \
-v /home/devops/dockerSrvStorage/gitlab/config:/etc/gitlab \
-v /home/devops/dockerSrvStorage/gitlab/logs:/var/log/gitlab \
-v /home/devops/dockerSrvStorage/gitlab/data:/var/opt/gitlab \
gitlab/gitlab-ce:13.11.3-ce.0

另外,之前使用的 GitLab 容器镜像 gitlab/gitlab-ce:13.9.2-ce.0 的 Integration-Jenkins CI 部分有 bug,换用 gitlab/gitlab-ce:13.11.3-ce.0 后 webhook 就可以工作了。

3)配置 Jenkins 使用 JDK、Maven, 如图:

配置 JDK
设置JDK

配置 Maven
设置JDK

4)通过 ssh-gen 生成 RSA 公私钥对,并配置 Jenkins 和 GitLab

ssh-keygen -t rsa -b 2048 -C "Used for dhyuan GitLab"

把生成的私钥通过 Jenkins 界面:Dashboard –> Credentials –> System Global credentials 设置为 GitLab 账号的私钥。

把生成的公钥通过 GitLab 界面进行设置, 如下图:

4)创建一个 Pipeline 项目并以从 GitLab 获取 Jenkinsfile 的形式定义 pipeline。

设置 pipeline 文件来自 git 管理的仓库:

需要注意的是,如果 branch 设置为*/master,那么就不要勾选 Lightweight checkout。如果勾选了 Lightweight checkout
,可把 branch 设置为*/*。

设置 pipeline 可由 GitLab 代码仓库事件触发执行,并在其高级选项中生成 Secret token。

5) 设置当 GitLab 发生 push 等操作是触发 Jenkins Pipeline 运行。

如图设置完毕就可以通过点击测试按钮发送“push”消息来触发 pipeline 的运行了。

Spring响应式CSRF相关源码

上篇介绍了 CSRF 及 Spring Security 对防范 CSRF 所做的支持。Spring Security 实现了基于 Session 和 Cookie 的 Sychronizer Token Pattern,以防范 CSRF。并且默认是基于 session 的。

Spring Security 基于 Servlet 和 WebFlux 技术分别进行了实现。
其中,基于 Servlet 技术栈的实现代码是:

CsrfFilter:执行过滤、验证。
CsrfTokenRepository:存储 csrf token 的接口。
CookieCsrfTokenRepository: 用 cookie 保存 csrf token
HttpSessionCsrfTokenRepository:用session 保存csrf token
CsrfAuthenticationStrategy.onAuthentication() 每次请求生成新 csrf token

基于 WebFlux 技术栈的实现代码没有与 CsrfAuthenticationStrategy 对应的类,而是直接在 CsrfWebFilter.filter()–> continueFilterChain() –> … this.csrfTokenRepository.loadToken(exchange).switchIfEmpty(generateToken(exchange)) 完成了。

Filter 和 TokenRepository 都有对应的类,如下:

CsrfWebFilter
ServerCsrfTokenRepository
  CookieServerCsrfTokenRepository WebSessionServerCsrfTokenRepository

CookieServerCsrfTokenRepository WebSessionServerCsrfTokenRepository 的逻辑更直接一些。下面以注释的方式解读一下 CsrfWebFilter 代码。

除了基本的 fitler,flatMap 这些 operators,可以特别留意以下几个 operators:
switchIfEmpty()
filterWhen()
delayUntil()
Mono.defer()

public class CsrfWebFilter implements WebFilter {

  public static final ServerWebExchangeMatcher DEFAULT_CSRF_MATCHER = new DefaultRequireCsrfProtectionMatcher();

  /**
  * The attribute name to use when marking a given request as one that should not be
  * filtered.
  *
  * To use, set the attribute on your {@link ServerWebExchange}: <pre>
  *     CsrfWebFilter.skipExchange(exchange);
  * </pre>
  */
  private static final String SHOULD_NOT_FILTER = "SHOULD_NOT_FILTER" + CsrfWebFilter.class.getName();

  private ServerWebExchangeMatcher requireCsrfProtectionMatcher = DEFAULT_CSRF_MATCHER;

  // 默认基于Session保存csrf token。
  private ServerCsrfTokenRepository csrfTokenRepository = new WebSessionServerCsrfTokenRepository();

  private ServerAccessDeniedHandler accessDeniedHandler = new HttpStatusServerAccessDeniedHandler(
      HttpStatus.FORBIDDEN);

  private boolean isTokenFromMultipartDataEnabled;

  public void setAccessDeniedHandler(ServerAccessDeniedHandler accessDeniedHandler) {
    Assert.notNull(accessDeniedHandler, "accessDeniedHandler");
    this.accessDeniedHandler = accessDeniedHandler;
  }

  public void setCsrfTokenRepository(ServerCsrfTokenRepository csrfTokenRepository) {
    Assert.notNull(csrfTokenRepository, "csrfTokenRepository cannot be null");
    this.csrfTokenRepository = csrfTokenRepository;
  }

  /**
  * 如果我们的应用GET,HEAD,TRACE,OPTIONS 这些方法会改变应用的状态,就需要自定义matcher。
  * 自定义的matcher可以有更复杂的匹配逻辑,而不仅仅限于DefaultRequireCsrfProtectionMatcher 中所用的request method。
  * 可参考 DefaultRequireCsrfProtectionMatcher。
  *
  * @param requireCsrfProtectionMatcher
  */
  public void setRequireCsrfProtectionMatcher(ServerWebExchangeMatcher requireCsrfProtectionMatcher) {
    Assert.notNull(requireCsrfProtectionMatcher, "requireCsrfProtectionMatcher cannot be null");
    this.requireCsrfProtectionMatcher = requireCsrfProtectionMatcher;
  }

  /**
  * Specifies if the {@code CsrfWebFilter} should try to resolve the actual CSRF token
  * from the body of multipart data requests.
  * @param tokenFromMultipartDataEnabled true if should read from multipart form body,
  * else false. Default is false
  */
  public void setTokenFromMultipartDataEnabled(boolean tokenFromMultipartDataEnabled) {
    this.isTokenFromMultipartDataEnabled = tokenFromMultipartDataEnabled;
  }

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    // 如果代码通过调用 CsrfWebFilter.skipExchange(exchange) 设置无需进行CSRF保护,则继续chain的下一个节点。
    if (Boolean.TRUE.equals(exchange.getAttribute(SHOULD_NOT_FILTER))) {
      return chain.filter(exchange).then(Mono.empty());
    }

    return this.requireCsrfProtectionMatcher.matches(exchange) // 根据 requireCsrfProtectionMatcher 进行匹配
        .filter(MatchResult::isMatch) // 如果需要CSRF保护。
        // 这里matchResult只是个占位符。过滤出已经设置了CsrfToken的exchange。
        .filter((matchResult) -> !exchange.getAttributes().containsKey(CsrfToken.class.getName()))
        // 验证这个csrf token,这个验证逻辑是CSRF的核心。根据csrf token是保存在session还是cookie,逻辑不同。
        // 详细逻辑参考 ServerCsrfTokenRepository 的两个实现 CookieServerCsrfTokenRepository WebSessionServerCsrfTokenRepository
        .flatMap((m) -> validateToken(exchange))
        // 校验文csrf token,继续chain的下个节点处理。
        .flatMap((m) -> continueFilterChain(exchange, chain))
        // switchIfEmpty实际是继续.filter(MatchResult::isMatch).filter((matchResult) -> 。。。) 这两个filter之后的情况。
        .switchIfEmpty(continueFilterChain(exchange, chain).then(Mono.empty()))
        .onErrorResume(CsrfException.class, (ex) -> this.accessDeniedHandler.handle(exchange, ex));
  }

  public static void skipExchange(ServerWebExchange exchange) {
    exchange.getAttributes().put(SHOULD_NOT_FILTER, Boolean.TRUE);
  }

  private Mono<Void> validateToken(ServerWebExchange exchange) {
    return this.csrfTokenRepository.loadToken(exchange) // 从session或者cookie中得到csrf token,作为比较基准。
        // 如果没有找到csrf token则抛出异常没有找到期望的csrf token。(之前版本的错误信息有误 https://github.com/spring-projects/spring-security/commit/a1083d9a5ce3fef8fa458a47e5a6b7a6576ec01e#diff-9c109ef13a33c07de3231051c08e424e664985d142f54ccd7830169d4adcadb1)
        // 注意这里用的defer而不是just用来避免不必要的对象创建。
        // switchIfEmpty() 感觉上就是reactive stream里的的if语句:如果是空流,则用另一个publisher。
        .switchIfEmpty(
            Mono.defer(() -> Mono.error(new CsrfException("An expected CSRF token cannot be found"))))
        /**
        * 判断用户从页面提交的csrf token和从session/cookie中保存的是否一致。
        * 注意,这里用了filterWhen而不是filter。表面的原因是containsValidCsrfToken()的返回值是Mono<Boolean>而不是Boolean。
        * 而更深层的原因应该是CsrfWebFilter不应该是阻塞的,因为这个filter要处理所有的request,所以需要异步地测试、过滤。
        * 感觉上也有点像flatMap,都是接收Publisher参数。
        **/
        .filterWhen((expected) -> containsValidCsrfToken(exchange, expected))
        // 如果没有找到有效的csrf token,那么就抛出异常。
        .switchIfEmpty(Mono.defer(() -> Mono.error(new CsrfException("Invalid CSRF Token")))).then();
  }

  private Mono<Boolean> containsValidCsrfToken(ServerWebExchange exchange, CsrfToken expected) {
    // 先从form里找用户提交的csrf token。
    return exchange.getFormData().flatMap((data) -> Mono.justOrEmpty(data.getFirst(expected.getParameterName())))
        // form 表单里没有,就从header里找
        .switchIfEmpty(Mono.justOrEmpty(exchange.getRequest().getHeaders().getFirst(expected.getHeaderName())))
        // 最后从multipart里找
        .switchIfEmpty(tokenFromMultipartData(exchange, expected))
        // 如果找到了,则和从session或cookie里得到的csrf token做比较
        .map((actual) -> equalsConstantTime(actual, expected.getToken()));
  }

  private Mono<String> tokenFromMultipartData(ServerWebExchange exchange, CsrfToken expected) {
    if (!this.isTokenFromMultipartDataEnabled) {
      return Mono.empty();
    }
    ServerHttpRequest request = exchange.getRequest();
    HttpHeaders headers = request.getHeaders();
    MediaType contentType = headers.getContentType();
    if (!contentType.includes(MediaType.MULTIPART_FORM_DATA)) {
      return Mono.empty();
    }
    return exchange.getMultipartData().map((d) -> d.getFirst(expected.getParameterName())).cast(FormFieldPart.class)
        .map(FormFieldPart::value);
  }

  private Mono<Void> continueFilterChain(ServerWebExchange exchange, WebFilterChain chain) {
    return Mono.defer(() -> {
      Mono<CsrfToken> csrfToken = csrfToken(exchange);
      exchange.getAttributes().put(CsrfToken.class.getName(), csrfToken);
      return chain.filter(exchange);
    });
  }

  private Mono<CsrfToken> csrfToken(ServerWebExchange exchange) {
    // 如果从repository中找到了csrf token就返回,否则就生成一个token返回。
    return this.csrfTokenRepository.loadToken(exchange).switchIfEmpty(generateToken(exchange));
  }

  /**
  * Constant time comparison to prevent against timing attacks.
  * @param expected
  * @param actual
  * @return
  */
  private static boolean equalsConstantTime(String expected, String actual) {
    if (expected == actual) {
      return true;
    }
    if (expected == null || actual == null) {
      return false;
    }
    // Encode after ensure that the string is not null
    byte[] expectedBytes = Utf8.encode(expected);
    byte[] actualBytes = Utf8.encode(actual);
    return MessageDigest.isEqual(expectedBytes, actualBytes);
  }

  private Mono<CsrfToken> generateToken(ServerWebExchange exchange) {
    // 委托给csrfTokenRepository 产生一个新token。
    return this.csrfTokenRepository.generateToken(exchange)
        // 注意这里的delayUntil()用法:直到token保存好之后,上步由csrfTokenRepository产生的Mono<CsrfToken>才继续走流水线。
        .delayUntil((token) -> this.csrfTokenRepository.saveToken(exchange, token));
  }

  private static class DefaultRequireCsrfProtectionMatcher implements ServerWebExchangeMatcher {

    // 如果我们应用在保证GET,HEAD,TRACE,OPTIONS等方法不改变系统状态,不需要进行CSRF保护,就可使用这个默认的matcher类。
    private static final Set<HttpMethod> ALLOWED_METHODS = new HashSet<>(
        Arrays.asList(HttpMethod.GET, HttpMethod.HEAD, HttpMethod.TRACE, HttpMethod.OPTIONS));

    @Override
    public Mono<MatchResult> matches(ServerWebExchange exchange) {
      return Mono.just(exchange.getRequest()) // 从exchagne得到request对象
          .flatMap((r) -> Mono.justOrEmpty(r.getMethod())) // 得到http request的method。
          .filter(ALLOWED_METHODS::contains) // 仅保留无需CSRF保护的方法
          .flatMap((m) -> MatchResult.notMatch()) // 过滤后,有无需CSRF保护的方法则不匹配(无需CSRF保护)。
          .switchIfEmpty(MatchResult.match()); // 过滤后,没有匹配到方法,则匹配(需CSRF保护)。
    }

  }

}

Reference:
https://docs.spring.io/spring-security/site/docs/5.4.5/reference/html5/#csrf

Spring对CSRF的防范

这篇基本上是Spring Security Reference关于 CSRF 部分的一个笔记,只是记录一下核心逻辑。其它很多细节还是要参考官方文档。

什么是 CSRF

跨站请求伪造。经典场景是:

1)受害者首先登录了银行网站
2)用户没有 longout 的情况下
3)用同一个浏览器访问了“坏”网站
4)“坏”网站有一个向银行网站提交业务请求的页面
5)诱使用户发送这个请求。

实际上利用有 XSS 漏洞,完全可以无需受害者参与利用 javascript 而自动触发第 4 第 5 步。

这个场景背后有些逻辑:

这里我们把浏览器等同于用户,有些数据是用户自己可见的,有些数据是浏览器自动处理、发送而用户对这些数据是无感知的(比如 SessionId)。
银行网站以 cookie 的形式把 sessionId 发送给浏览器(set-cookie),浏览器每次请求都会再自动带上 cookie(cookie)。
上面场景第 5 步虽然“坏”表单不是源于银行网站页面而是在第三方网站的页面上,但是浏览器发现目标地址是银行网站,因此会自动带上响应的 cookie,比如 JSESSION 就会随带着被发送了。从服务器的角度看,来自第 4 步的数据与正常数据没有任何差别,因为这个业务请求便会被执行。

归根结底,这种 CSRF 的问题是 cookie 和浏览器工作方式引起的。

解决方案

一个是主流的“Synchronizer Token Pattern”方法,另一个是渐成主流的“SameSite Attribute”。

因为SameSite Attribute的方式实施和理解比较容易,我们先说。
服务端利用 cookie 的 SameSite 属性可以禁止浏览器从外部站点发送请求时带上 cookie。比如下面的 cookie 就不会被放在由在第三方网页发起而目标是银行网站的请求上。这就自然解决了上面的 CSRF 的问题。

Set-Cookie: JSESSIONID=randomid; Domain=bank.example.com; Secure; HttpOnly; SameSite=Lax

Synchronizer Token Pattern
这个解决办法的原理是对来自浏览器的请求我们都回送一个随机数,下次浏览器再请求业务时需要在 header 里或者表单里带上这个随机数。这个随机数就是 csrf token。
这个办法之所以能防范 CSRF,是因为 sessionId 来自 cookie,而 csrf token 来自 header 或者 form。相当于分别在两条不同的路径上传递。

Spring Security 模块生成 csrf token 后可放在两个地方。Spring 默认的,随机数与 sessionId 关联,放在 session 里。另一个方式:随机数放在 cookie 里。

基于 Session 保存 csrf token

与 session 关联比较容易理解,下次浏览器发送请求过来,服务端就可以从 header 或 form 里取出来的 csrf token 与 session 中的随机数相比较来进行判断。

通过 cookie 保存 csrf 是怎么回事呢?如果 csrf token 通过 cookie 发送给浏览器,那这个随机数不就跟 JSESSIONID 一样了会被浏览器自动传回到服务器了吗?
是的,这个 csrf 会被浏览器传回给服务器,我们也正是利用这一点“保存”了 csrf token。之所以使用基于 cookie 的方式,是因为要针对前后端分离的情形让前端可以使用 javascript 获得 csrf token,并把这个 token 作为下次请求的 header 参数或者 form 参数传递给服务端。服务端所要做的就是对比来自 cookie 的 csrf token 和来自 header/form 里的 csrftoken 来做出判断。

下面代码设置使用 cookie 保存,注意使用 cookie 传递 token 需要把 cookie 的 HttpOnly 属性设置为 false,以便让 javascript 能读到此值。

@EnableWebSecurity
public class WebSecurityConfig extends
        WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) {
        http
            .csrf(csrf -> csrf
                .csrfTokenRepository(CookieCsrfTokenRepository.withHttpOnlyFalse())
            );
    }
}