继上篇支付宝沙箱环境接入。
上次的文章遗留了一个问题:如何通过异步获取支付结果?
本篇将在分布式的场景下进行环境搭建,将应用分为支付服务以及客户服务,并且在获取异步支付结果的基础上,确保两个服务的最终结果同步,也就是确保支付后业务逻辑必定处理完成。
本文技术选型:Springboot 2.2+RabbitMQ+CentOS+Docker+MySql。
首先进行支付环境搭建。
公私钥申请以及配置文件修改参照上篇文章。
此处将上次文章中的支付服务改造成Springboot适用的代码。
我们需要三个接口,分别是:payment支付,returnUrl同步通知,notifyUrl异步通知。
此处解释一下同步通知与异步通知的区别:
同步通知仅仅在浏览器通知一个支付结果,此结果不代表支付的最终结果,并且如若关闭浏览器,之后的业务代码也不会执行,因此支付成功与否不得参照此通知。
异步通知接口由支付宝进行调用,是在支付宝处理完业务逻辑后,调用异步通知接口通知我们业务处理成功,程序执行完后必须打印输出“success”(不包含引号)。如果反馈给支付宝的字符不是 success 这7个字符,支付宝服务器会不断重发通知,直到超过24小时22分钟。一般情况下,25小时以内完成8次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。
还有需要注意的一点:异步通知的触发必须要确保该接口可以被公网访问,如果该接口不能被公网访问,那么异步回调不成功,因此支付端需要在可被公网访问的服务器上运行!!!
业务逻辑可以在异步通知中进行处理,这样能确保只要支付成功,该逻辑一定会运行。
因此书写以下三个接口:
@RequestMapping("/payment")
@ResponseBody
public String payment(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.gatewayUrl, AlipayConfig.app_id, AlipayConfig.merchant_private_key, "json", AlipayConfig.charset, AlipayConfig.alipay_public_key, AlipayConfig.sign_type);
//设置请求参数
AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
alipayRequest.setReturnUrl(AlipayConfig.return_url);
alipayRequest.setNotifyUrl(AlipayConfig.notify_url);
//商户订单号,商户网站订单系统中唯一订单号,必填
String out_trade_no = new String(request.getParameter("tradeNo").getBytes("ISO-8859-1"),"UTF-8");
//付款金额,必填
String total_amount = new String(request.getParameter("totalAmount").getBytes("ISO-8859-1"),"UTF-8");
//订单名称,必填
String subject = new String(request.getParameter("subject").getBytes("ISO-8859-1"),"UTF-8");
//商品描述,可空
String body = new String(request.getParameter("body").getBytes("ISO-8859-1"),"UTF-8");
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");
//请求
String result = alipayClient.pageExecute(alipayRequest).getBody();
return result;
}
@RequestMapping("/returnUrl")
@ResponseBody
public String returnUrl(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名
if(signVerified) {
//商户订单号
String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"),"UTF-8");
//支付宝交易号
String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"),"UTF-8");
//付款金额
String total_amount = new String(request.getParameter("total_amount").getBytes("ISO-8859-1"),"UTF-8");
return "支付成功,请确认!";
}else {
return "验签失败";
}
}
@RequestMapping("/notifyUrl")
@ResponseBody
public String notifyUrl(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
Map<String,String> params = new HashMap<String,String>();
Map<String,String[]> requestParams = request.getParameterMap();
for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
String name = (String) iter.next();
String[] values = (String[]) requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}
boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名
if(signVerified) {//验证成功
//商户订单号
String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"),"UTF-8");
//支付宝交易号
String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"),"UTF-8");
//交易状态
String trade_status = new String(request.getParameter("trade_status").getBytes("ISO-8859-1"),"UTF-8");
if(trade_status.equals("TRADE_FINISHED")){
//判断该笔订单是否在商户网站中已经做过处理
//如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
//如果有做过处理,不执行商户的业务程序
//注意:
//退款日期超过可退款期限后(如三个月可退款),支付宝系统发送该交易状态通知
}else if (trade_status.equals("TRADE_SUCCESS")){
//判断该笔订单是否在商户网站中已经做过处理
//如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
//如果有做过处理,不执行商户的业务程序
//注意:
//付款完成后,支付宝系统发送该交易状态通知
}
return "success";
}else {//验证失败
return "fail";
//调试用,写文本函数记录程序运行情况是否正常
//String sWord = AlipaySignature.getSignCheckContentV1(params);
//AlipayConfig.logResult(sWord);
}
}
我们可以在异步通知中直接处理对应的业务逻辑,在同一个系统中没有任何问题,不过我们的系统是分布式的,也就是说,支付是单独的一个系统,如果我们要处理业务逻辑,那必须通过远程调用,但是如果我们的业务系统正在遭受网络波动,那很有可能,我们的业务逻辑就没有同步过去,也就是说,人家付了钱,并没有看到商品状态变为“已付款”。
所以说,这样并没有办法确保我们的服务前后一致,我们必须要做到,一旦用户支付,那么无论如何最后用户都要看到支付成功的状态。
这时候就需要引入RabbitMQ来协助我们完成了。
有了消息队列,我们就可以在完成支付后,将支付的结果存入队列,当业务端收到队列的消息时,就进行对应的业务处理,这样免去了远程调用,业务端的数据处理也可以在业务端本地完成。
那么来安装一个RabbitMQ。
这里通过Docker快速安装RabbitMQ。
1.在linux安装Docker
yum install docker
2.安装完成后,启动Docker
>systemctl start docker
3.输入 docker ‐v,看到版本号即安装成功。
4.安装RabbitMQ
docker pull rabbitmq:3-management
5.成功后输入docker images,获取对应的Image Id
6.启动RabbitMq(需要放行5672和15672端口)
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 最后填入你的Image Id
7.进入管理界面(15672端口),输入用户名guest,密码guest即可登录。
配置成功后,我们需要创建一个用于接收消息的队列:
1.创建交换机:
输入名字,这里选择direct即可。
2.创建队列:
输入名字即可
3.点击队列名,将队列和交换机绑定
完成后将RabbitMq与SpringBoot绑定。
Maven依赖:支付应用。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置消息转换器
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
在异步通知添加消息发送逻辑(通过RabbitTemplate):
Map<String, Object> map = new HashMap<>();
map.put("OutTradeNo",payment.getOutTradeNo());
map.put("TradeNo",payment.getTradeNo());
map.put("TradeStatus",payment.getTradeStatus());
rabbitTemplate.convertAndSend(交换机名, 队列名,map);
}
这样发送消息就完成了,同时在业务端配置消息监听器。
Maven依赖:业务端。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
同样配置消息转换器,然后配置RabbitListener监听消息。
@RabbitListener(queues = 队列名)
public void receive(Map<String, Object> map) {
String outTradeNo = (String)map.get("OutTradeNo");//接收信息
//处理业务逻辑
}
那这样就完成了通过MQ传递消息给业务端,从而处理业务逻辑的整个过程。
不过这样还不是最安全的。
消息队列并不能保证100%的消息送达率,也有可能会出现阻塞,处理失败等一系列预料之外的问题,同样消息接收端也并不能保证100%的成功接收并处理消息。
那如果消息队列出了岔子,最终我们还是达不到我们想要的效果。
所以这些还不够,我们必须继续添加控制来达成我们的目的。
如果说仅仅发一次消息不靠谱,那么我们是否可以让支付端不断发送消息,直到收到来自业务端业务处理成功的消息为止呢?
这样的话,即使出现漏发,网络波动等情况,不断重发消息基本能够保证有消息推送到业务端。
我们继续改造。
我们在支付端新建一张支付表,存入对应的支付订单编号以及一个状态码。
假设状态码0代表仍未收到业务端处理结果,1表示已经收到业务端的结果。
在异步通知中,我们仅仅将支付记录插入到数据表中。
而发送消息,我们可以建立一个定时任务,在状态码仍为0的情况下,不断给业务端发送消息。
@Scheduled(cron = "0/5 * * * * *")//每五秒执行一次
public void resend(){
Payment payment = new Payment();
payment.setStatus(0);
List<Payment> payments = paymentDao.select(payment);//查询状态码为0的支付记录
if(payments==null||payments.size()==0){
return;
}
for (Payment payment1 : payments) {
sendMessage(payment1);//遍历支付记录,发送消息。
}
}
然后建立一个回调接口,一旦业务端逻辑处理成功,便允许其调用该接口,将状态码改为1。
@ResponseBody
@RequestMapping("/updateStatus")
public String updateStatus(String id){
paymentService.updateStateById(id,1);//由业务端调用,将状态改为1
return "success";
}
此时由于状态码改变,支付端便不再发送消息。
业务端在业务处理完成后调用对应的修改状态接口:
@RabbitListener(queues = 队列名)
public void receive(Map<String, Object> map) {
String outTradeNo = (String)map.get("OutTradeNo");
//处理业务逻辑
restTemplate.getForObject(服务端接口, String.class);//调用业务逻辑处理成功接口,也可以使用队列发送。
}
如此修改,我们便可以确保业务端能够收到消息,并且支付端也可以及时了解业务的处理情况,根据具体情况决定是否继续发送消息。
但还有一个问题:现在的逻辑是每发一次消息,业务端都会进行一次业务处理,那么如果是多条消息,业务处理也会进行多次,这很可能导致业务端的数据发生错乱。
所以我们必须考虑一点,也就是接口的幂等。
简而言之就是,无论支付端发送了多少条消息,业务端的处理结果都要一致。
要完成这点也不是很难,我们需要给业务端也添加一张表(或者字段)表明消息的处理情况,假设0为等待处理,1为已处理。
那么每次收到消息,我们需要进行查询,判断该消息是否已被处理,只有仍未被处理的情况下,才会继续进行业务逻辑。
@RabbitListener(queues = 队列名)
public void receive(Map<String, Object> map) {
String outTradeNo = (String)map.get("OutTradeNo");
Payment paymentInsurance = new Payment();
paymentInsurance.setPaymentId(outTradeNo);
paymentInsurance.setPaymentStatus(1);
if(paymentMapper.selectCount(payment)>0){//判断状态是否为1,如果为1,直接结束,不为1则继续处理
return;
}else {
//处理业务逻辑
restTemplate.getForObject(服务端接口, String.class);//调用业务逻辑处理成功接口,也可以使用队列发送。
}
那这样,就保证了接口无论收到多少消息,处理结果均一致。
那么我们之前提到的问题也宣告完成,在这种情况下,一旦用户支付,那么无论如何最后用户都能看到支付成功的状态。
这就是MQ异步最终确保式分布式事务在支付和业务场景中的应用。