Fork me on GitHub

人群(一)系统人群生成上线总结

系统人群生成部分运行指令总结

IMG_0833

一、基本配置信息

底表重复数据查询

1
2
3
4
5
6
7
8
9
10
SELECT *
FROM
(SELECT
jdpin_offset,
COUNT(1) AS cnt
FROM dmx_upf.dmxupf_width_jd_usr_pin_crd_s_d
WHERE dt = '2019-11-07'
GROUP BY jdpin_offset) a1
WHERE a1.cnt > 1
limit 1;

正则匹配 | 整数、小数

1
2
^[\\+\\-]?[\\d]+(\\.[\\d]+)?$
'^-?[0-9]+.?[0-9]*$'

基本底表

1
2
3
4
5
JCW_DMXUPF_WIDTH_JD_USR_PIN_ALL_TAGS_S_D    自动化合并宽表
JCW_DMXUPF_WIDTH_TAG_ENUM_STAT_A_D 标签集市统计计算
JCW_DMXUPF_WIDTH_AUTO_PUSH_CK_PROC_S_D 标签宽表推送CK
JCW_DMXUPF_WIDTH_BITMAP_PUSH_I_D 标签宽表bitmap计算与云推送
JCW_DMXUPF_WIDTH_TAG_VALUE_HBASE_A_D 标签宽表推送Hbase

堡垒机

1
2
3
4
5
6
7
8
9
Azkaban: 10.220.xx.xxx

HDFS:10.220.xx.xxx

BigData_Test: 10.222.xxx.xx

CK_Test:

CK 生产:

R2M

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
用户人群队列 / 日志:deviceBM-temp

企业人群队列:tag-manage

offset2pin: TagBM_C2_1

pin2offset: TagBM_C2_2

新消费(对外合作项目):dqp

大数据作业监控所需key:profile_monitor

business2offset

offset2business

人群所属队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 企业画像人群创建队列
val companyAsk_create = "QUEUE_CK2BITMAP_BUSI"
// 企业画像人群更新队列
val companyAsk_update = ""
// 待创建人群请求队列
val crowdCreateSqlKey = "hive_sql2bitmap_queue_todo"
// 待更新人群请求队列
val crowdUpdateSqlKey = "hive_sql2bitmap_batch_update_queue_todo"
// 上传人群请求队列
val crowdFromFileQueue = "user_bitmap_rule_load_queue_todo"
// 自定义人群请求队列
val customAsk="user_bitmap_rule_custom_queue_todo"
val customUpt="user_bitmap_rule_custom_batch_update_queue_todo"
// 传媒人群请求响应队列
val mediaAsk_file="media_user_bitmap_rule_load_queue_todo"
val mediaAsk_sql="media_hive_sql2bitmap_queue_todo"
// 人群二次筛选
val crowdFilterQueue="user_bitmap_rule_once_calculate_queue_todo"
// 智慧足迹文件人群请求和响应队列
val unicomAsk_portrait = "wisdom-queue-portrait-task-message"
val unicomAsk_crowd="wisdom-queue-crowd-task-message"
val unicomAskResponse = "wisdom-queue-task-result"
// 智慧足迹SQL人群请求和响应队列
val unicomSupply_portrait = "smartsteps-queue-task-message"
val unicomSupplyResponse_portrait = "smartsteps-queue-task-result"
// 智慧足迹回流人群存储
val unicomSupply_crowd = "daas-queue-task-message"
val unicomSupplyResponse_crowd = "daas-queue-task-result"

// 需要推送到CK的明细表清单(表名)
val pushCkUserDtlList="push_ck_user_table_list"
// 当前需要推送到CK的明细表队列
val pushCkUserDtlQueue="push_ck_user_table_queue"
// 要推送CK的列Key前缀
val pushCkSchemaPrefix="push_schema_"
val pushCkDtPrefix="push_dt_"

二、spark-shell基本执行语句

Sparksql查询命令

1
2
3
4
5
6
7
8
9
10
11
12
13
dmx_upf.dmxupf_width_jd_usr_pin_all_tags_s_d      标签大宽表
1.spark-shell --num-executors 300
2.val data = spark.read.table("dmx_upf.dmxupf_width_jd_usr_pin_all_tags_s_d").filter("dt = '2019-11-22'").select("pin004003001003","pin004002002039")
4.data.cache
5.data.filter("pin004003001003 > '2019-11-22'").count
6.data.select("pin004003001003").distinct.show
7.yarn application -list|grep wanghe143 模糊匹配
8.yarn application -kill application_id 杀掉应用
9.yarn application -status application_id 查询应用状态

spark.read.table("dmx_upf.dmxupf_width_jd_usr_pin_all_tags_s_d").filter("dt = '2019-11-04'").filter("pin004003001003 > '2019-11-22'").count

spark.read.table("dmx_upf.dmxupf_width_jd_usr_pin_all_tags_s_d").filter("dt = '2019-11-05'").select("pin010000001").filter("pin010000001 = 16").limit(1).show

三、人群上线指令操作

Linux操作命令 重要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
mvn clean package  打包操作                    -P profile

sudo su - hadoop 切换到hadoop,并将hadoop环境变量也一起带过去

rz -y 从windows上下载文件并且文件存在时覆盖

TAB linux指令自动补全

mkdir file 创建文件夹

rm(rmdir) -rf file 删除文件夹

mv ../tag_crowd_former-1.0-SNAPSHOT.jar . 将本层文件移动至父层

ctrl + a 移动到段首

ctrl + w 删除行(以空格分割)

ctrl + d 断开ssh连接

ctrl + r C 查询历史执行记录

llen queue 查询队列内容

Hgetall queue 命令用于返回哈希表中,所有的字段和值。



其中,crowd_former 和 vop_submitter 代码上线过程指令如下:
crowd_former:通过小集群ssh到5k大集群
1. 登陆Azakaban
2. 进到hwang目录下
3. rz -y 将本地包上传至生产小集群(11台)
4. scp tag_crowd_former-1.0-SNAPSHOT.jar xxx@ip地址:/export/home/hwang/tmp_job
将包通过scp方式上传至5k大集群
5. ssh xxxx@ip地址 账号 + 密码 获得5k大集群操作权限
6. hdfs dfs -put -f tag_crowd_former-1.0-SNAPSHOT.jar job/jars 将包上传至5k大集群hdfs
7. hdfs dfs -ls job/jars 确认包的上传时间

特殊流程:手动执行人群生成spark指令 入参为(人群ID + 人群所属队列)
spark-submit
--master yarn --driver-memory 4G --num-executors 60 --executor-memory 8G --executor-cores 2
--jars guava-20.0.jar,RoaringBitmap-0.6.66.jar
--conf spark.executor.extraClassPath=guava-20.0.jar:RoaringBitmap-0.6.66.jar
--conf spark.driver.extraClassPath=guava-20.0.jar:RoaringBitmap-0.6.66.jar
--class com.jdd.data_asset.CrowdGeneMain tag_crowd_former-1.0-SNAPSHOT.jar
921d68ca-3c17-478c-9f27-36d9aff826cf hive_sql2bitmap_queue_todo


vop_submitter:通过etl账户控制人群生成代码提交到大集群的调度任务
1. sudo su - dwetl 登陆etl账户
2. rz -y 将本地包上传至生产集群
3. mv tag_vop_submitter-1.0-SNAPSHOT.jar tag_crowd_submitter.jar 对包进行重命名
4. mv tag_crowd_submitter.jar job/run_jar/ 将包移动至job/run_jar目录下
vop_submitter代码中,submitCmd脚本为从hdfs中拉取人群生成代码提交到大集群运行
5. hdfs dfs -put -f tag_crowd_submitter.jar job/jars 将包上传至5k大集群hdfs
6. hdfs dfs -ls job/jars 确认包的上传时间
7. azkaban上传脚本,并设置调度时间
su - dwetl -c "sh /home/dwetl/job/shell/run_from_hadoop.sh tag_crowd_submitter.jar 'scala -cp tag_crowd_submitter.jar com.jdd.data_asset.service_center.crowd_service.GeneRemoteCrowd SYS_SQL'"


特殊流程:手动执行某一人群的调度任务
scala -cp tag_crowd_submitter.jar com.jdd.data_asset.service_center.crowd_service.GeneRemoteCrowd SYS_SQL 系统人群生成
-------------本文结束感谢您的阅读-------------
大 吉 大 利!