csdn学院招募微信小程序讲师啦
程序猿全指南,让【移动开发】更简单!
【观点】移动原生app开发 pk html 5开发
云端应用征文大赛,秀绝招,赢无人机!
分类:
凯发娱发k8的版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[ ]
- rocketmq试用简单的spring集成
- rocketmq
核心原理
- 1 数据结构
2 刷盘策略
3 内存机制
4 工作模式
环境安装
- 1 java环境安装
2 rocketmq安装
测试网络拓扑
启停操作
运维指令
基本测试
宕机实验
遗留问题
rocketmqspring源码下载
参考文献
经过2天的试用初步了解了一下rocketmq的基本用法,搜索了一下度娘,没有找到spring的例子,所以简单搞了一点代码感受一下。
1.rocketmq
rocketmq的前身是metaq,当metaq3.0发布时,产品名称改为rocketmq,有以下特点:
1) 能够保证严格的消息顺序
2) 提供丰富的消息拉取模式
3) 高效的订阅者水平扩展能力
4)实时的消息订阅机制
5)亿级消息堆积能力
2.核心原理
2.1. 数据结构
(1)所有数据单独储存到commit log ,完全顺序写,随机读
(2)对最终用户展现的队列实际只储存消息在commit log 的位置信息,并且串行方式刷盘
(3)按照messageid查询消息
(4)根据查询的key的hashcode%slotnum得到具体的槽位置
(5)根据slotvalue(slot对应位置的值)查找到索引项列表的最后一项
(6)遍历索引项列表返回查询时间范围内的结果集
2.2. 刷盘策略
rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取
使用简单的符号标识不同的标题,将某些文字标记为粗体或者斜体,创建一个链接等,详细语法参考帮助?。
本编辑器支持 markdown extra , 扩展了很多好用的功能。具体请参考[github][2].
2.3. 内存机制
2.4. 工作模式
3. 环境安装
3.1. java环境安装
安装
rpm -ivh jdk-7u80-linux-x64.rpm
1
1
环境变量
java_home=/usr/java/jdk1.7.0_80
classpath=.:$java_home/lib.tools.jar
path=$java_home/bin:$path
export java_home classpath path
export rocketmq_home=/usr/local/service/alibaba-rocketmq
1
2
3
4
5
1
2
3
4
5
3.2. rocketmq安装
https://github.com/alibaba/rocketmq/releases下载3.2.6,解压
4. 测试网络拓扑
因为手里没有其他服务器,105那台缺少一个slave,在同步双写模式下,发送消息会返回
slave_not_available
,不过消息已经发送成功,只是slave没有写成功。
5. 启停操作
这里只给出一个基本的示例,各个模式的启停在本文最后的参考文献中会有详细的说明。这里不再赘述。
启动nameserver
nohup ./mqnamesrv &
1
1
停止nameserver
./mqshutdown namesrv
1
1
启动broker(单master)(多master,多master slave)对应的(异步复制,同步双写)
nohup sh mqbroker -n 192.168.146.109:9876 -c $rocketmq_home/conf/2m-noslave/broker-a.properties &
1
1
停止broker
./mqshutdown broker
1
1
6. 运维指令
查看集群情况
./mqadmin clusterlist -n 127.0.0.1:9876
1
1
查看broker状态
./mqadmin brokerstatus -n 127.0.0.1:9876 -b 192.168.146.105:10911
1
1
查看topic列表
./mqadmin topiclist -n 127.0.0.1:9876
1
1
查看topic状态
./mqadmin topicstatus -n 127.0.0.1:9876 -t pushtopic
1
1
查看topic路由
./mqadmin topicroute -n 127.0.0.1:9876 -t pushtopic
1
1
7. 基本测试
基本测试采用java直接编码的方式生产和消费消息,例子来源于参考文献的《rocketmq开发教程》。本文最后的代码示例,采用了spring的形式。
producer
package com.jd.wxz;
import com.alibaba.rocketmq.client.producer.defaultmqproducer;
import com.alibaba.rocketmq.client.producer.sendresult;
import com.alibaba.rocketmq.common.message.message;
public class producer {
public static void main(string[] args){
defaultmqproducer producer = new defaultmqproducer("producer");
producer.setnamesrvaddr("192.168.146.109:9876");
try {
producer.start();
message msg = new message("pushtopic",
"push",
"1",
"just for test.".getbytes());
sendresult result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus());
msg = new message("pushtopic",
"push",
"2",
"just for test.".getbytes());
result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus());
msg = new message("pulltopic",
"pull",
"1",
"just for test.".getbytes());
result = producer.send(msg);
system.out.println("id:" result.getmsgid()
" result:" result.getsendstatus());
} catch (exception e) {
e.printstacktrace();
}finally{
producer.shutdown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
consumer
package com.sean;
import java.util.list;
import com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;
import com.alibaba.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import com.alibaba.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import com.alibaba.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import com.alibaba.rocketmq.common.consumer.consumefromwhere;
import com.alibaba.rocketmq.common.message.message;
import com.alibaba.rocketmq.common.message.messageext;
public class consumer {
public static void main(string[] args){
defaultmqpushconsumer consumer =
new defaultmqpushconsumer("pushconsumer");
consumer.setnamesrvaddr("192.168.146.109:9876");
try {
//订阅pushtopic下tag为push的消息
consumer.subscribe("pushtopic", "push");
//程序第一次启动从消息队列头取数据
consumer.setconsumefromwhere(
consumefromwhere.consume_from_first_offset);
consumer.registermessagelistener(
new messagelistenerconcurrently() {
public consumeconcurrentlystatus consumemessage(
list list,
consumeconcurrentlycontext context) {
message msg = list.get(0);
system.out.println(msg.tostring());
return consumeconcurrentlystatus.consume_success;
}
}
);
consumer.start();
} catch (exception e) {
e.printstacktrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
运行结果:
服务端监控:
8.宕机实验
9.遗留问题
1) 关闭master 自动切换到slave无法实现,官方资料上没有明确指明,第三方文档里有(见文献3)。
2) 在开发机服务器上运行os.sh进行优化,导致网络无法连接,运维帮忙重启才恢复。
10.rocketmq spring源码下载
戳我
11.参考文献
1) 《rocketmq入门(上)》
2) 《rocketmq入门(下)》
3) 《rokectmq开发教程》
4) 《阿里rocketmq quict start》
5) 《rocketmq与kafka对比(18项差异)》
6) 《rocketmq命令整理》
7) 《rocketmq原理简介》
- 顶
- 0
- 踩
- 0
.blog-ass-articl dd {
color: #369;
width: 99%; /*修改行*/
float: left;
overflow: hidden;
font: normal normal 12px/23px "simsun";
height: 23px;
margin: 0;
padding: 0 0 0 10px;
margin-right: 30px;
background: no-repeat 0 10px;
}
参考知识库
linux知识库
9149关注|3461收录
java se知识库
20836关注|468收录
java ee知识库
13999关注|1215收录
java 知识库
22124关注|1436收录
软件测试知识库
3312关注|310收录
算法与数据结构知识库
12521关注|2320收录
- 猜你在找
更多资料请参考:
查看评论
暂无评论
发表评论
用 户 名:
liuguoyun_123456
评论内容:
html/xmlobjective-cdelphirubyphpc#c javascriptvisual basicpythonjavacsssql其它
* 以上用户言论只代表其个人观点,不代表csdn网站的观点或立场
.tag_list
{
background: none repeat scroll 0 0 #ffffff;
border: 1px solid #d7cbc1;
color: #000000;
font-size: 12px;
line-height: 20px;
list-style: none outside none;
margin: 10px 2% 0 1%;
padding: 1px;
}
.tag_list h5
{
background: none repeat scroll 0 0 #e0dbd3;
color: #47381c;
font-size: 12px;
height: 24px;
line-height: 24px;
padding: 0 5px;
margin: 0;
}
.tag_list h5 a
{
color: #47381c;
}
.classify
{
margin: 10px 0;
padding: 4px 12px 8px;
}
.classify a
{
margin-right: 20px;
white-space: nowrap;
}
全部主题
hadoop
aws
移动游戏
java
android
ios
swift
智能硬件
docker
openstack
vpn
spark
erp
ie10
eclipse
crm
javascript
数据库
ubuntu
nfc
wap
jquery
bi
html5
spring
apache
.net
api
html
sdk
iis
fedora
xml
lbs
unity
splashtop
uml
components
windows mobile
rails
qemu
kde
cassandra
cloudstack
ftc
coremail
ophone
couchbase
云计算
ios6
rackspace
web app
springside
maemo
compuware
大数据
aptech
perl
tornado
ruby
hibernate
thinkphp
hbase
pure
solr
angular
cloud foundry
redis
scala
django
bootstrap
#popup_mask
{
position: absolute;
width: 100%;
height: 100%;
background: #000;
z-index: 9999;
left: 0px;
top: 0px;
opacity: 0.3;
filter: alpha(opacity=30);
display: none;
}