阿里蚂蚁金服技术栈 一、SOFARPC 服务发布、引用以及调用的简单流程图如下:
当一个 SOFARPC 的应用启动时,如果发现当前应用需要发布 RPC 服务,那么 SOFARPC 会将该服务注册到配置中心,就是图中蓝色实线所示的过程。 当引用这个服务的 SOFA 应用启动时,会从配置中心订阅对应服务的地址,当配置中心收到订阅请求后,会将发布方的地址列表推送给订阅方,就是图中绿色实线所示的过程。 当引用服务的一方拿到地址以后,就可以调用服务了,就是图中蓝色虚线所示的过程。 使用步骤 依赖
1 2 3 4 <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>rpc-enterprise-sofa-boot-starter</artifactId> </dependency>
服务发布 1、设计服务接口类
1 2 3 4 5 6 7 8 public interface SampleService { public String hello () ; }
2、编写服务实现类
1 2 3 4 5 6 7 8 9 10 public class SampleServiceImpl implements SampleService { @Override public String hello () { return "hello world" ; } }
3、在 Spring XML 中配置服务发布
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:sofa="http://schema.alipay.com/sofa/schema/slite" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://schema.alipay.com/sofa/schema/slite http://schema.alipay.com/sofa/slite.xsd"> <!-- 声明服务的实现对象,以下类全名和接口全名,请根据自己的包名进行指定 --> <bean id="sampleService" class="com.alipay.samples.rpc.impl.SampleServiceImpl"/> <!-- 发布 RPC 服务 --> <sofa:service ref="sampleService" interface="com.alipay.samples.rpc.SampleService"> <sofa:binding.bolt/> </sofa:service> </beans>
4、启动服务
直接运行项目中的 Application即可,框架会自动进行服务的发布。
服务引用 1、在 Spring XML 中配置服务引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:sofa="http://schema.alipay.com/sofa/schema/slite" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://schema.alipay.com/sofa/schema/slite http://schema.alipay.com/sofa/slite.xsd"> <!-- 以下接口名请根据服务提供方的接口全名来指定 --> <sofa:reference id="sampleServiceRef" interface="com.alipay.samples.rpc.SampleService"> <sofa:binding.bolt> <!-- 等待配置中心返回地址的时间,如果在 5000 ms 内有地址返回的话,等待过程会立马结束 --> <sofa:global-attrs address-wait-time="5000" test-url="127.0.0.1:12200"/> </sofa:binding.bolt> </sofa:reference> </beans>
2、将服务引用对象注入业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @ImportResource({"classpath*:META-INF/demo/*.xml"}) @org.springframework.boot.autoconfigure.SpringBootApplication public class SOFABootWebSpringApplication { // init the logger private static final Logger logger = LoggerFactory.getLogger(SOFABootWebSpringApplication.class); public static void main(String[] args) { //下面这一行只有在本地同时启动客户端和服务端的时候需要,正式环境不可以写 System.setProperty("server.port", "8081"); SpringApplication springApplication = new SpringApplication(SOFABootWebSpringApplication.class); ApplicationContext applicationContext = springApplication.run(args); if (logger.isInfoEnabled()) { logger.info("application start"); } SampleService sampleService = (SampleService) applicationContext.getBean("sampleServiceRef"); String resp = sampleService.hello(); if (logger.isInfoEnabled()) { logger.info("the resp data is " + resp); } } }
发布及引用 BOLT 服务 BOLT 服务的名称来自于 RPC 使用的底层通信框架 BOLT。相对于传统的 WebService,BOLT 支持更加复杂的对象,序列化后的对象更小,且提供了更为丰富的调用方式(sync、oneway、callback、future 等),支持更广泛的应用场景。
发布一个 BOLT 的服务
1 2 3 4 5 <!-- 发布 BOLT 服务 --> <sofa:service interface="com.alipay.test.SampleService" ref="sampleService" unique-id="service1"> <sofa:binding.bolt/> </sofa:service>
引用一个 BOLT 的服务
1 2 3 4 <!-- 引用 BOLT 服务 --> <sofa:reference interface="com.alipay.test.SampleService" id="sampleService"> <sofa:binding.bolt/> </sofa:reference>
二、消息队列 消息队列主要涉及五个核心角色,消息发布者(Publisher)、消息代理组件(Message Broker)、消息订阅者(Subscriber)、消息类型(Message Type)和订阅关系(Binding),具体描述如下:
消息发布者 :指发送消息的应用系统。一个应用系统可以发送一种或者多种类型的消息,消息发布者将消息发送到消息代理组件。消息代理组件 :负责接收发布者发送的消息,根据消息类型和消息订阅元数据将消息分发投递到一个或多个消息订阅者。整个过程涉及消息类型校验、消息持久化存储、消息订阅关系匹配、消息投递和消息恢复等核心功能。消息订阅者 :指订阅消息的应用系统。一个应用系统可以订阅一种或者多种消息类型,消息订阅者收到的消息来自消息代理组件。消息类型 :一种消息类型由 TOPIC 和 EVENTCODE 唯一标识。消息订阅 :用来描述一种消息类型被哪些订阅者订阅,订阅元数据也被称为 Binding。使用步骤 1、确定消息类型
一种消息类型由 TOPIC 和 EVENTCODE 唯一标识。
参数 命名规则 示例 TOPIC 以 “TP_” 开头 TP_DEFAULT EVENTCODE 以 “EC_” 开头 EC_DEFAULT
2、本地开发
依赖
1 2 3 4 <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>mq-enterprise-sofa-boot-starter</artifactId> </dependency>
application.properties文件 run.mode=DEV
发布者 (Publisher) 1 2 3 4 5 6 7 8 9 10 <sofa:publisher id="mqPublisher" group="P_appname_service"> <sofa:channels> <sofa:channel value="TP_DEFAULT"/> </sofa:channels> <sofa:binding.msg_broker/> </sofa:publisher> <bean id="mqService" class="com.antcloud.tutorial.mq.endpoint.service.MqService"> <property name="mqPublisher" ref="mqPublisher"/> </bean>
id 是 Spring Bean 的单例服务标识,可以被依赖注入到其它 Spring Bean 属性值中。group 命名格式为 P\_应用名\_服务名
,是发布者唯一标识,不允许存在两个 sofa:publisher
配置相同的 group 属性值。sofa:channel 元素的 value 值是此发布者发送的消息类型 TOPIC 值,如果发送消息类型涉及多个 TOPIC,必须配置多个 sofa:channel
元素。创建消息对象(UniformEvent)。 设置消息对象属性值。 发送消息。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class MqService { private static final Logger logger = LoggerFactory.getLogger(MqService.class); private final static String TOPIC = "TP_DEFAULT"; private final static String EVENTCODE = "EC_DEFAULT"; private UniformEventPublisher mqPublisher; private UniformEventBuilder uniformEventBuilder = new DefaultUniformEventBuilder(); public boolean publish() { if (logger.isInfoEnabled()) { logger.info("Publish a message."); } /* Create a message instance. */ final UniformEvent message = uniformEventBuilder.buildUniformEvent(TOPIC, EVENTCODE); /* Set the business object as an event payload. */ message.setEventPayload(buildDefaultAccount()); /* Mark that a runtime exception must be thrown when publishing failure. */ message.setThrowExceptionOnFailed(true); boolean publishSuccess = false; try { /* Do publish action. */ mqPublisher.publishUniformEvent(message); publishSuccess = true; logger.info("Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] payload [{}]", new Object[] { message.getTopic(), message.getEventCode(), message.getId(), message.getEventPayload() }); } catch (Exception e) { logger.error("Public a message, failure. TOPIC [{}] EVENTCODE [{}] error [{}]", new Object[] { message.getTopic(), message.getEventCode(), e.getMessage() }, e); } return publishSuccess; } private Account buildDefaultAccount() { Account account = new Account(); account.setId(UUID.randomUUID().toString()); account.setAmount(new Random().nextDouble()); account.setGmtCreate(new Date()); return account; } public void setMqPublisher(UniformEventPublisher mqPublisher) { this.mqPublisher = mqPublisher; } }
消息对象由 UniformEventBuilder
负责创建,必须指定 TOPIC 和 EVENTCODE 两个参数。消息负载封装在业务对象中,并设置为 uniformEvent 的 payload 属性值。消息发送通过 uniformEventPublisher
对象的 publisherUniformEvent
方法完成,此方法可能抛出运行时异常,如果应用程序预期捕获异常并处理,必须设置 uniformEvent 对象的属性值 throwExceptionOnFailed
为 true,否则运行时异常不会被抛出,只会记录错误日志。
消费者 (Consumer) 1 2 3 4 5 6 7 8 9 10 11 12 13 <!-- Declare a consumer bean with id "mqConsumer" --> <sofa:consumer id="mqConsumer" group="S_appname_service"> <sofa:listener ref="mqMessageListener"/> <sofa:channels> <sofa:channel value="TP_DEFAULT"> <sofa:event eventType="direct" eventCode="EC_DEFAULT" persistence="true"/> </sofa:channel> </sofa:channels> <sofa:binding.msg_broker/> </sofa:consumer> <!-- mq message listener --> <bean id="mqMessageListener" class="com.antcloud.tutorial.mq.endpoint.service.MqMessageListener"/>
id 是 Spring Bean 的单例服务标识,group 命名格式为 S\_应用名\_服务名
,是消费者唯一标识,不允许存在两个 sofa:consumer
配置相同的 group 属性值。sofa:listener 元素的 ref 属性值设置为消息接监听器单例服务标识。sofa:channel 元素的 value 值是此消费者订阅的消息类型 TOPIC 值,如果订阅的消息类型涉及多个 TOPIC,必须配置多个 sofa:channel
元素。sofa:event 元素配置具体的消息订阅信息,eventType 属性值默认设置为 direct
,eventCode 属性值设置为消息类型 eventcode 值,persistence 属性值设置为持久订阅(true)或者非持久订阅(false)。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class MqMessageListener implements UniformEventMessageListener { private static final Logger logger = LoggerFactory.getLogger(MqMessageListener.class); @Override public void onUniformEvent(UniformEvent message, UniformEventContext context) throws Exception { /* get TOPIC, EVENTCODE and payload from the message instance */ final String topic = message.getTopic(); final String eventcode = message.getEventCode(); final String id = message.getId(); final Object businessObject = message.getEventPayload(); logger.info("Receive a message, TOPIC [{}] EVENTCODE [{}] id [{}] payload [{}]", new Object[] { topic, eventcode, id, businessObject }); try { boolean processSuccess = processMessage(businessObject); if (!processSuccess) { /* Process the message failure, set cause and rollback, the message is re-delivered later. */ context.setContextDesc("process error cause"); context.setRollbackOnly(); } } catch (Exception e) { logger.error("Process a message, failure. TOPIC [{}] EVENTCODE [{}] id [{}] error {}", new Object[] { topic, eventcode, id, e.getMessage() }, e); /* When any exception is thrown, the message is re-delivered later. */ throw e; } } /* Process the business logic */ private boolean processMessage(Object businessObject) { return true; } }
订阅消息的应用系统必须实现 com.alipay.common.event.UniformEventMessageListener
接口并配置在 sofa:listener
元素的 ref 属性值中。当消息被消息消费者接收到时,com.alipay.common.event.UniformEventMessageListene.onUniformEvent
会被调用,应用系统通过实现此方法执行消息消费逻辑。
三、任务调度 依赖
1 2 3 4 <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>scheduler-enterprise-sofa-boot-starter</artifactId> </dependency>
1、实现处理器接口
1 2 3 4 5 6 7 8 9 10 11 12 13 /** * 简单任务处理器接口 */ public interface ISimpleJobHandler extends IJobHandler { /** * 处理request * * @param context * @return */ ClientCommonResult handle(JobExecuteContext context); }
2、实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class AlwaysSuccessHandler implements ISimpleJobHandler { public static final String NAME = "ALWAYS_SUCCESS_JOB"; @Override public ClientCommonResult handle(JobExecuteContext context) throws InterruptedException { //处理业务逻辑 return ClientCommonResult.buildSuccessResult(); } @Override public ThreadPoolExecutor getThreadPool() { return null; } @Override public String getName() { return NAME; } }
说明 :当任务需要按多个步骤执行时,需要写多个实现类,并在控制台开启分步。
3、控制台配置发布
CRON 表达式 分为 6 或 7 个域,每一个域代表一个含义。CRON 有如下两种语法格式:
秒 分 小时 日期 月份 星期 年 秒 分 小时 日期 月份 星期 每个域允许的值
域 允许的数值 允许的特殊字符 备注 秒 0-59 - * / 分 0-59 - * / 小时 0-23 - * / 日期 1-31 - * ? / L W C 月份 1-12 JAN-DEC - * / 星期 1-7 SUN-SAT - * ? / L C # 1 表示星期天,2 表示星期一,依次类推 年(可选) 留空,1970-2099 , - * / 自动生成,工具不显示该值
示例
*/5 * * * * ?
每隔 5 秒执行一次0 */1 * * * ?
每隔 1 分钟执行一次0 0 2 1 * ? *
每月 1 日的凌晨 2 点执行一次0 15 10 ? * MON-FRI
周一到周五每天上午 10:15 执行作业0 15 10 ? 6L 2002-2006
2002 年至 2006 年的每个月的最后一个星期五上午 10:15 执行作业0 0 23 * * ?
每天 23 点执行一次0 0 1 * * ?
每天凌晨 1 点执行一次0 0 1 1 * ?
每月 1 日凌晨 1 点执行一次0 0 23 L * ?
每月最后一天 23 点执行一次0 0 1 ? * L
每周星期天凌晨 1 点执行一次0 26,29,33 * * * ?
在 26 分、29 分、33 分执行一次0 0 0,13,18,21 * * ?
每天的 0 点、13 点、18 点、21 点都执行一次0 0 10,14,16 * * ?
每天上午 10 点,下午 2 点,4 点执行一次0 0/30 9-17 * * ?
朝九晚五工作时间内每半小时执行一次0 0 12 ? * WED
每个星期三中午 12 点执行一次0 0 12 * * ?
每天中午 12 点触发0 15 10 ? * *
每天上午 10:15 触发0 15 10 * * ?
每天上午 10:15 触发0 15 10 * * ? *
每天上午 10:15 触发0 15 10 * * ? 2005
2005 年的每天上午 10:15 触发0 * 14 * * ?
每天下午 2 点到 2:59 期间的每 1 分钟触发0 0/5 14 * * ?
每天下午 2 点到 2:55 期间的每 5 分钟触发0 0/5 14,18 * * ?
每天下午 2 点到 2:55 期间和下午 6 点到 6:55 期间的每 5 分钟触发0 0-5 14 * * ?
每天下午 2 点到 2:05 期间的每 1 分钟触发0 10,44 14 ? 3 WED
每年三月的星期三的下午 2:10 和 2:44 触发0 15 10 ? * MON-FRI
周一至周五的上午 10:15 触发0 15 10 15 * ?
每月 15 日上午 10:15 触发0 15 10 L * ?
每月最后一日的上午 10:15 触发0 15 10 ? * 6L
每月的最后一个星期五上午 10:15 触发0 15 10 ? * 6L 2002-2005
2002 年至 2005 年的每月的最后一个星期五上午 10:15 触发0 15 10 ? * 6#3
每月的第三个星期五上午 10:15 触发API网关、分布式事务、数据同步服务…