继上篇支付宝沙箱环境接入。

上次的文章遗留了一个问题:如何通过异步获取支付结果?

本篇将在分布式的场景下进行环境搭建,将应用分为支付服务以及客户服务,并且在获取异步支付结果的基础上,确保两个服务的最终结果同步,也就是确保支付后业务逻辑必定处理完成。

本文技术选型:Springboot 2.2+RabbitMQ+CentOS+Docker+MySql。

首先进行支付环境搭建。

公私钥申请以及配置文件修改参照上篇文章。

此处将上次文章中的支付服务改造成Springboot适用的代码。

我们需要三个接口,分别是:payment支付,returnUrl同步通知,notifyUrl异步通知。

此处解释一下同步通知与异步通知的区别:

同步通知仅仅在浏览器通知一个支付结果,此结果不代表支付的最终结果,并且如若关闭浏览器,之后的业务代码也不会执行,因此支付成功与否不得参照此通知。

异步通知接口由支付宝进行调用,是在支付宝处理完业务逻辑后,调用异步通知接口通知我们业务处理成功,程序执行完后必须打印输出“success”(不包含引号)。如果反馈给支付宝的字符不是 success 这7个字符,支付宝服务器会不断重发通知,直到超过24小时22分钟。一般情况下,25小时以内完成8次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。

还有需要注意的一点:异步通知的触发必须要确保该接口可以被公网访问,如果该接口不能被公网访问,那么异步回调不成功,因此支付端需要在可被公网访问的服务器上运行!!!

业务逻辑可以在异步通知中进行处理,这样能确保只要支付成功,该逻辑一定会运行。

因此书写以下三个接口:

@RequestMapping("/payment")
    @ResponseBody
    public String payment(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
        AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.gatewayUrl, AlipayConfig.app_id, AlipayConfig.merchant_private_key, "json", AlipayConfig.charset, AlipayConfig.alipay_public_key, AlipayConfig.sign_type);

        //设置请求参数
        AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
        alipayRequest.setReturnUrl(AlipayConfig.return_url);
        alipayRequest.setNotifyUrl(AlipayConfig.notify_url);

        //商户订单号,商户网站订单系统中唯一订单号,必填
        String out_trade_no = new String(request.getParameter("tradeNo").getBytes("ISO-8859-1"),"UTF-8");
        //付款金额,必填
        String total_amount = new String(request.getParameter("totalAmount").getBytes("ISO-8859-1"),"UTF-8");
        //订单名称,必填
        String subject = new String(request.getParameter("subject").getBytes("ISO-8859-1"),"UTF-8");
        //商品描述,可空
        String body = new String(request.getParameter("body").getBytes("ISO-8859-1"),"UTF-8");

        alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
                + "\"total_amount\":\""+ total_amount +"\","
                + "\"subject\":\""+ subject +"\","
                + "\"body\":\""+ body +"\","
                + "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");

        //请求
        String result = alipayClient.pageExecute(alipayRequest).getBody();

        return result;
    }
    @RequestMapping("/returnUrl")
    @ResponseBody
    public String returnUrl(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
        Map<String,String> params = new HashMap<String,String>();
        Map<String,String[]> requestParams = request.getParameterMap();
        for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
            String name = (String) iter.next();
            String[] values = (String[]) requestParams.get(name);
            String valueStr = "";
            for (int i = 0; i < values.length; i++) {
                valueStr = (i == values.length - 1) ? valueStr + values[i]
                        : valueStr + values[i] + ",";
            }
            //乱码解决,这段代码在出现乱码时使用
            valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
            params.put(name, valueStr);
        }

        boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名

        if(signVerified) {
            //商户订单号
            String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"),"UTF-8");

            //支付宝交易号
            String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"),"UTF-8");

            //付款金额
            String total_amount = new String(request.getParameter("total_amount").getBytes("ISO-8859-1"),"UTF-8");

            return "支付成功,请确认!";
        }else {
            return "验签失败";
        }
    }
    @RequestMapping("/notifyUrl")
    @ResponseBody
    public String notifyUrl(HttpServletRequest request) throws UnsupportedEncodingException, AlipayApiException {
        Map<String,String> params = new HashMap<String,String>();
        Map<String,String[]> requestParams = request.getParameterMap();
        for (Iterator<String> iter = requestParams.keySet().iterator(); iter.hasNext();) {
            String name = (String) iter.next();
            String[] values = (String[]) requestParams.get(name);
            String valueStr = "";
            for (int i = 0; i < values.length; i++) {
                valueStr = (i == values.length - 1) ? valueStr + values[i]
                        : valueStr + values[i] + ",";
            }
            //乱码解决,这段代码在出现乱码时使用
            valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
            params.put(name, valueStr);
        }

        boolean signVerified = AlipaySignature.rsaCheckV1(params, AlipayConfig.alipay_public_key, AlipayConfig.charset, AlipayConfig.sign_type); //调用SDK验证签名

        
        if(signVerified) {//验证成功
            //商户订单号
            String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"),"UTF-8");

            //支付宝交易号
            String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"),"UTF-8");

            //交易状态
            String trade_status = new String(request.getParameter("trade_status").getBytes("ISO-8859-1"),"UTF-8");

            if(trade_status.equals("TRADE_FINISHED")){
                //判断该笔订单是否在商户网站中已经做过处理
                //如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
                //如果有做过处理,不执行商户的业务程序

                //注意:
                //退款日期超过可退款期限后(如三个月可退款),支付宝系统发送该交易状态通知
            }else if (trade_status.equals("TRADE_SUCCESS")){
                //判断该笔订单是否在商户网站中已经做过处理
                //如果没有做过处理,根据订单号(out_trade_no)在商户网站的订单系统中查到该笔订单的详细,并执行商户的业务程序
                //如果有做过处理,不执行商户的业务程序
                //注意:
                //付款完成后,支付宝系统发送该交易状态通知
            }

            return "success";

        }else {//验证失败
            return "fail";

            //调试用,写文本函数记录程序运行情况是否正常
            //String sWord = AlipaySignature.getSignCheckContentV1(params);
            //AlipayConfig.logResult(sWord);
        }
    }

我们可以在异步通知中直接处理对应的业务逻辑,在同一个系统中没有任何问题,不过我们的系统是分布式的,也就是说,支付是单独的一个系统,如果我们要处理业务逻辑,那必须通过远程调用,但是如果我们的业务系统正在遭受网络波动,那很有可能,我们的业务逻辑就没有同步过去,也就是说,人家付了钱,并没有看到商品状态变为“已付款”。

所以说,这样并没有办法确保我们的服务前后一致,我们必须要做到,一旦用户支付,那么无论如何最后用户都要看到支付成功的状态。

这时候就需要引入RabbitMQ来协助我们完成了。

有了消息队列,我们就可以在完成支付后,将支付的结果存入队列,当业务端收到队列的消息时,就进行对应的业务处理,这样免去了远程调用,业务端的数据处理也可以在业务端本地完成。

那么来安装一个RabbitMQ。

这里通过Docker快速安装RabbitMQ。

1.在linux安装Docker
yum install docker
2.安装完成后,启动Docker
>systemctl start docker
3.输入 docker ‐v,看到版本号即安装成功。

4.安装RabbitMQ
docker pull rabbitmq:3-management
5.成功后输入docker images,获取对应的Image Id

6.启动RabbitMq(需要放行5672和15672端口)
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 最后填入你的Image Id
7.进入管理界面(15672端口),输入用户名guest,密码guest即可登录。

rabbit

配置成功后,我们需要创建一个用于接收消息的队列:

1.创建交换机:

输入名字,这里选择direct即可。

rabbit2

2.创建队列:

输入名字即可

rabbit4

3.点击队列名,将队列和交换机绑定

rabbit3-1652184246486

完成后将RabbitMq与SpringBoot绑定。

Maven依赖:支付应用。

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置消息转换器

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

在异步通知添加消息发送逻辑(通过RabbitTemplate):

        Map<String, Object> map = new HashMap<>();
        map.put("OutTradeNo",payment.getOutTradeNo());
        map.put("TradeNo",payment.getTradeNo());
        map.put("TradeStatus",payment.getTradeStatus());
        rabbitTemplate.convertAndSend(交换机名, 队列名,map);
    }

这样发送消息就完成了,同时在业务端配置消息监听器。

Maven依赖:业务端。

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

同样配置消息转换器,然后配置RabbitListener监听消息。

@RabbitListener(queues = 队列名)
    public void receive(Map&lt;String, Object&gt; map) {
        String outTradeNo = (String)map.get("OutTradeNo");//接收信息
       //处理业务逻辑
    }

那这样就完成了通过MQ传递消息给业务端,从而处理业务逻辑的整个过程。

不过这样还不是最安全的。

消息队列并不能保证100%的消息送达率,也有可能会出现阻塞,处理失败等一系列预料之外的问题,同样消息接收端也并不能保证100%的成功接收并处理消息。

那如果消息队列出了岔子,最终我们还是达不到我们想要的效果。

所以这些还不够,我们必须继续添加控制来达成我们的目的。

如果说仅仅发一次消息不靠谱,那么我们是否可以让支付端不断发送消息,直到收到来自业务端业务处理成功的消息为止呢?

这样的话,即使出现漏发,网络波动等情况,不断重发消息基本能够保证有消息推送到业务端。

我们继续改造。

我们在支付端新建一张支付表,存入对应的支付订单编号以及一个状态码。

假设状态码0代表仍未收到业务端处理结果,1表示已经收到业务端的结果。

在异步通知中,我们仅仅将支付记录插入到数据表中。

而发送消息,我们可以建立一个定时任务,在状态码仍为0的情况下,不断给业务端发送消息。

@Scheduled(cron = "0/5 * * * * *")//每五秒执行一次
    public void resend(){
        Payment payment = new Payment();
        payment.setStatus(0);
        List&lt;Payment&gt; payments = paymentDao.select(payment);//查询状态码为0的支付记录
        if(payments==null||payments.size()==0){
            return;
        }
        for (Payment payment1 : payments) {
            sendMessage(payment1);//遍历支付记录,发送消息。
        }
    }

然后建立一个回调接口,一旦业务端逻辑处理成功,便允许其调用该接口,将状态码改为1。

   @ResponseBody
   @RequestMapping("/updateStatus")
   public String updateStatus(String id){
       paymentService.updateStateById(id,1);//由业务端调用,将状态改为1
       return "success";
   }

此时由于状态码改变,支付端便不再发送消息。

业务端在业务处理完成后调用对应的修改状态接口:

@RabbitListener(queues = 队列名) 
public void receive(Map&lt;String, Object&gt; map) { 
String outTradeNo = (String)map.get("OutTradeNo"); 
//处理业务逻辑 
restTemplate.getForObject(服务端接口, String.class);//调用业务逻辑处理成功接口,也可以使用队列发送。
}

如此修改,我们便可以确保业务端能够收到消息,并且支付端也可以及时了解业务的处理情况,根据具体情况决定是否继续发送消息。

但还有一个问题:现在的逻辑是每发一次消息,业务端都会进行一次业务处理,那么如果是多条消息,业务处理也会进行多次,这很可能导致业务端的数据发生错乱。

所以我们必须考虑一点,也就是接口的幂等。

简而言之就是,无论支付端发送了多少条消息,业务端的处理结果都要一致。

要完成这点也不是很难,我们需要给业务端也添加一张表(或者字段)表明消息的处理情况,假设0为等待处理,1为已处理。

那么每次收到消息,我们需要进行查询,判断该消息是否已被处理,只有仍未被处理的情况下,才会继续进行业务逻辑。

@RabbitListener(queues = 队列名) 
public void receive(Map&lt;String, Object&gt; map) { 
String outTradeNo = (String)map.get("OutTradeNo");
Payment paymentInsurance = new Payment();
paymentInsurance.setPaymentId(outTradeNo);
paymentInsurance.setPaymentStatus(1); 
if(paymentMapper.selectCount(payment)&gt;0){//判断状态是否为1,如果为1,直接结束,不为1则继续处理
            return;
        }else {
//处理业务逻辑 
restTemplate.getForObject(服务端接口, String.class);//调用业务逻辑处理成功接口,也可以使用队列发送。 
}

那这样,就保证了接口无论收到多少消息,处理结果均一致。

那么我们之前提到的问题也宣告完成,在这种情况下,一旦用户支付,那么无论如何最后用户都能看到支付成功的状态。

这就是MQ异步最终确保式分布式事务在支付和业务场景中的应用。