add cluster-dir-list.

This commit is contained in:
kaichao 2023-10-22 17:27:44 +08:00
parent 315bb341d7
commit d40d283a6c
11 changed files with 249 additions and 7 deletions

View File

@ -1,4 +1,4 @@
dirs:=cron list-dir ftp-copy rsync-copy rsyncd data-grouping-2d
dirs:=cron list-dir ftp-copy rsync-copy rsyncd data-grouping-2d cluster-file-copy cluster-dir-list
build:
@for dir in $(dirs); do \

View File

@ -1,11 +1,26 @@
# dockerfiles
## Introduction
scalebox应用中常见的公用模块。
## file-related
| 模块名 | 模块描述 |
| ---- | ---- |
| cluster-dir-list | 集群内目录的文件列表 |
| cluster-file-copy | 基于rsync-over-ssh的跨集群文件复制 |
| dir-list | 本地或远程目录的文件列表支持本地目录、远端rsync目录、远端rsync-over-ssh目录、远端ftp目录等|
| rsync-copy | 基于rsync-over-ssh、rsync的远端文件复制 |
| ftp-copy | 基于ftp的远端文件复制 |
| rsyncd | rsync的服务端 |
## data-grouping-2d
基于2维数据集的数据分组是数据处理中的常见模式。将数据集中数据实体按id组织为2维数据集并支持x、y方向上对数据进行分组。
## cron
定时消息生成模块,以启动后续模块。消息体一般用当前时间戳来表示。
## list-dir
## actuator
支持在标准actuator模块中自动生成自定义的公私钥。
## ftp-copy
## rsync-copy
## rsyncd
## node-manager
单个节点上的actuator用于在当前节点上启动slot。

View File

@ -0,0 +1,30 @@
FROM debian:12-slim
LABEL maintainer="kaichao"
# install the newest version, rsync 3.2.7, openssh 9.3p1
RUN echo "deb http://deb.debian.org/debian testing main" > /etc/apt/sources.list.d/bookworm-testing.list \
&& apt-get update \
&& apt-get install -y rsync openssh-client zstd \
&& apt-get clean autoclean \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
COPY --from=hub.cstcloud.cn/scalebox/actuator /root/.ssh /root/.ssh
ENV SOURCE_CLUSTER= \
REGEX_FILTER= \
REGEX_2D_DATASET= \
INDEX_2D_DATASET= \
# 'yes'
JUMP_SERVER_OPTION=
COPY run.sh /app/bin/
COPY list-files.sh /usr/local/bin/
COPY --from=hub.cstcloud.cn/scalebox/base /usr/local/sbin/ /usr/local/sbin/
COPY --from=hub.cstcloud.cn/scalebox/list-dir /usr/local/bin/get_2d_meta /usr/local/bin/
RUN mkdir -p /work/.scalebox /app/bin && echo "PATH=/app/bin:\${PATH}" >> /root/.bashrc
WORKDIR /work
ENTRYPOINT [ "goagent" ]

View File

@ -0,0 +1,13 @@
IMAGE_NAME:=hub.cstcloud.cn/scalebox/cluster-dir-list
build:
DOCKER_BUILDKIT=1 docker build --network=host -t $(IMAGE_NAME) .
push:
docker push $(IMAGE_NAME)
clean:
docker rmi $(IMAGE_NAME)
dist:
docker save $(IMAGE_NAME) | zstdmt | pv | ssh c0 'zstd -d | docker load'

View File

@ -0,0 +1,26 @@
# cluster-dir-list
## 模块介绍
集群内目录的文件列表远端目录基于rsync-over-ssh实现
## 环境变量
- SOURCE_CLUSTER: 若为空,则再由消息体确定
- JUMP_SERVER_OPTION: "source"
- REGEX_FILTER:
- REGEX_2D_DATASET:
- INDEX_2D_DATASET:
## 输入消息格式
[<SOURCE_CLUSTER>~]<RELATIVE_PATH>#<local_dir>
- 若环境变量为空,则以输入消息中的<SOURCE_CLUSTER>来替代
- RELATIVE_PATH相对CLUSTER_DATA_ROOT的相对根目录不包含在产生的消息体中
- local_dir本地路径包含在产生的消息体中
## 用户应用的退出码
- 0 : OK
## 输出消息格式
- 若退出码为0则输出与输入消息相同的消息。
- 退出码非0则不输出消息

View File

@ -0,0 +1,82 @@
#!/bin/bash
# create hashtable cluster_map
declare -A cluster_map
if [ -e /work/.scalebox/cluster_data.txt ]; then
while read line; do
# separate key and value
key=$(echo $line | cut -d " " -f 1)
value=$(echo $line | cut -d " " -f 2)
# put key value to hashtable
cluster_map[$key]=$value
done < /work/.scalebox/cluster_data.txt
fi
if [ "$SOURCE_CLUSTER" == "" ]; then
cluster=$CLUSTER_NAME
else
cluster=$SOURCE_CLUSTER
fi
v=cluster_map[$cluster]
if [ "$v" != "" ]; then
v=$(scalebox cluster get-parameter --cluster $cluster rsync)
code=$?
[[ $code -ne 0 ]] && echo cmd: get_cluster_rsync, error_code:$code && exit $code
cluster_map[$cluster]=$v
echo $cluster $v >> /work/.scalebox/cluster_data.txt
fi
rsync_prefix=$(echo $v | cut -d "#" -f 1)
ssh_port=$(echo $v | cut -d "#" -f 2)
dir1=$(echo $1 | cut -d "#" -f 1)
dir2=$(echo $1 | cut -d "#" -f 2)
if [ "$SOURCE_CLUSTER" == "" ]; then
# local dir
cluster_root=$(echo $rsync_prefix | cut -d ":" -f 2)
# set /local to support symlink
data_dir="/local${cluster_root}/${dir1}/${dir2}"
echo local data-dir:${data_dir} >&2
cd ${data_dir} && find -L . -type f \
| sed "s/^\./${dir2}/" \
| egrep "${REGEX_FILTER}"
# | sed 's/^\.\///' \
else
# rsync-over-ssh
data_dir=$(echo $1 | cut -d "#" -f 1)
echo remote data-dir:${rsync_prefix}/${dir1}/${dir2} >&2
rsync -avn -L -e "ssh -p ${ssh_port} ${ssh_args}" ${rsync_prefix}/${dir1}/${dir2} \
| grep ^\- | awk {'print $5'} \
| awk '{ gsub(/^[^\/]+?\//,""); print $0 }' \
| sed "s/^/${dir2}\//" \
| egrep "${REGEX_FILTER}"
fi
# exit status of egrep
# 0 if a line is selected
# 1 if no lines were selected
# 2 if an error occurred.
status=(${PIPESTATUS[@]})
echo "[INFO]pipe_status:"${status[*]} >&2
n=${#status[*]}
if [ $n == 1 ]; then
if [ ${status[0]} -ne 0 ]; then
echo "[ERROR]local mode, dir: "${LOCAL_ROOT}" not found" >&2
exit ${status[0]}
fi
fi
declare -i code
for ((i=n-1; i>=0; i--)); do
code=${status[i]}
if [ $i == $((n-1)) ]; then
if [ $code == 1 ]; then
echo "[WARN]All of data are filtered, empty dataset!" >&2
code=0
fi
fi
if [ $code -ne 0 ]; then
break
fi
done
exit $code

View File

@ -0,0 +1,37 @@
#!/bin/bash
dir=$1
if [ "$SOURCE_CLUSTER" == "" ]; then
# extract SOURCE_CLUSTER from message-body
# ${SOURCE_CLUSTER}~${RELATIVE_PATH}#${local_dir}
if [[ "$dir" =~ ^([^~]*)~([^#]+)#([^#]+)$ ]]; then
SOURCE_CLUSTER=${BASH_REMATCH[1]}
dir=${BASH_REMATCH[2]}#${BASH_REMATCH[3]}
fi
fi
echo dir-name:$dir
if [ "$REGEX_2D_DATASET" ]; then
meta=$(list-files.sh $dir | get_2d_meta $REGEX_2D_DATASET $INDEX_2D_DATASET)
code=$?
[[ $code -ne 0 ]] && echo cmd: get_2d_meta, error_code:$code && exit $code
echo ${meta} > /work/key-text.txt
echo metadata for 2d-dataset:#${meta}#
# key text in file /work/key-text.txt
scalebox task add
code=$?
[[ $code -ne 0 ]] && echo cmd: scalebox task add, error_code:$code && exit $code
fi
ret_code=0
list-files.sh $dir | while read line; do
send-message $line
code=$?
[[ $code -ne 0 ]] && echo "Error send-message, file:"$line >&2 && exit $code
done
code=${PIPESTATUS[0]}
[[ $code -ne 0 ]] && echo "Error run list-files.sh, dir:"$dir >&2 >&2 && exit $code
exit 0

View File

@ -0,0 +1,26 @@
name: cluster-dir-list.test
cluster: ${CLUSTER}
parameters:
initial_status: RUNNING
jobs:
cluster-dir-list:
base_image: hub.cstcloud.cn/scalebox/cluster-dir-list
schedule_mode: HEAD
variables:
# 1 hour
max_sleep_count: 120
parameters:
start_message: ${START_MESSAGE}
environments:
- REGEX_FILTER=${REGEX_FILTER}
- REGEX_2D_DATASET=${REGEX_FITS}
- INDEX_2D_DATASET=${INDEX_FITS}
sink_jobs:
- next-job
next-job:
base_image: hub.cstcloud.cn/scalebox/agent
parameters:
key_group_regex: ${REGEX_FITS}
key_group_index: ${INDEX_FITS}

View File

@ -0,0 +1,2 @@
~fits#Dec+6007_09_03
~fits#Dec+4120_03_03

View File

@ -0,0 +1,2 @@
main~fits#Dec+6007_09_03
main~fits#Dec+4120_03_03

View File

@ -0,0 +1,9 @@
CLUSTER=qiu
START_MESSAGE=FILE:remote.txt
#START_MESSAGE=FILE:local.txt
REGEX_FILTER=^.+-M[0-9]{2}_000[1-9].+
REGEX_FITS=^([^/]+/[^/]+)/.+M([0-9]{2})_([0-9]{4}).+$
INDEX_FITS=1,2,3