优化对账单生成逻辑,完成使用websocket给客户端推送消息

This commit is contained in:
mzr 2024-09-04 16:09:15 +08:00
parent 802e339ecf
commit 564aa86e5c
9 changed files with 388 additions and 30 deletions

View File

@ -114,7 +114,8 @@ public class CheckAccountApi {
} else { } else {
bills = ListOrMapForJson.getlistForJson(jsonStr); bills = ListOrMapForJson.getlistForJson(jsonStr);
} }
ExecutorService pool = Executors.newFixedThreadPool(4); ExecutorService pool = Executors.newFixedThreadPool(1);
List<Map<String, Object>> mapList = new ArrayList<>();
Map<String, Object> map = null; Map<String, Object> map = null;
for (int i = 0; i < bills.size(); i++) { for (int i = 0; i < bills.size(); i++) {
map = new HashMap<>(); map = new HashMap<>();
@ -122,12 +123,14 @@ public class CheckAccountApi {
map.put("flag", "1"); map.put("flag", "1");
map.put("mdate", mdate); map.put("mdate", mdate);
map.put("createUser", createUser); map.put("createUser", createUser);
GenerateCheckThread thread = new GenerateCheckThread("1", map, checkMonthService, checkYearService, null, null, companyId); mapList.add(map);
//提交线程池 执行
pool.submit(thread);
bills.remove(i); bills.remove(i);
i = i - 1; i = i - 1;
} }
GenerateCheckThread thread = new GenerateCheckThread("1", mapList, checkMonthService, checkYearService,
null, null, companyId, "");
//提交线程池 执行
pool.submit(thread);
//关闭线程池 //关闭线程池
pool.shutdown(); pool.shutdown();
return AjaxResult.success(); return AjaxResult.success();
@ -188,7 +191,8 @@ public class CheckAccountApi {
if (StringUtil.isEmpty(endTime)) { if (StringUtil.isEmpty(endTime)) {
endTime = DateUtil.getDay(); endTime = DateUtil.getDay();
} }
ExecutorService pool = Executors.newFixedThreadPool(4); ExecutorService pool = Executors.newFixedThreadPool(1);
List<Map<String, Object>> mapList = new ArrayList<>();
Map<String, Object> map = null; Map<String, Object> map = null;
for (int i = 0; i < bills.size(); i++) { for (int i = 0; i < bills.size(); i++) {
map.put("flag", flag); map.put("flag", flag);
@ -196,12 +200,14 @@ public class CheckAccountApi {
// 默认为本年度的,期间对账都是本年度的 // 默认为本年度的,期间对账都是本年度的
map.put("yearFlag", StringUtil.isNotEmpty(yearFlag) ? yearFlag : "0"); map.put("yearFlag", StringUtil.isNotEmpty(yearFlag) ? yearFlag : "0");
map.put("saleYear", saleYear); map.put("saleYear", saleYear);
GenerateCheckThread thread = new GenerateCheckThread(flag, map, checkMonthService, checkYearService, startTime, endTime, companyId); mapList.add(map);
//提交线程池 执行
pool.submit(thread);
bills.remove(i); bills.remove(i);
i = i - 1; i = i - 1;
} }
GenerateCheckThread thread = new GenerateCheckThread(flag, mapList, checkMonthService, checkYearService,
startTime, endTime, companyId, "");
//提交线程池 执行
pool.submit(thread);
//关闭线程池 //关闭线程池
pool.shutdown(); pool.shutdown();
return AjaxResult.success(); return AjaxResult.success();

View File

@ -82,6 +82,7 @@ public class CheckAccountController extends BaseController {
public Map<String, Object> createMonthReconce( public Map<String, Object> createMonthReconce(
@ApiParam(required = false, value = "客户编码 json(clientId,clientNo,clientName)") @RequestParam(required = false) String jsonStr, @ApiParam(required = false, value = "客户编码 json(clientId,clientNo,clientName)") @RequestParam(required = false) String jsonStr,
@ApiParam(required = true, value = "日期(年月)") @RequestParam(required = true) String mdate, @ApiParam(required = true, value = "日期(年月)") @RequestParam(required = true) String mdate,
@ApiParam(required = false, value = "webSocket客户端名称") @RequestParam(required = false) String webSocketName,
HttpServletRequest request) { HttpServletRequest request) {
String token = request.getHeader("token"); String token = request.getHeader("token");
if (!redisService.isKey(token)) { if (!redisService.isKey(token)) {
@ -118,7 +119,8 @@ public class CheckAccountController extends BaseController {
} else { } else {
bills = ListOrMapForJson.getlistForJson(jsonStr); bills = ListOrMapForJson.getlistForJson(jsonStr);
} }
ExecutorService pool = Executors.newFixedThreadPool(3); ExecutorService pool = Executors.newFixedThreadPool(1);
List<Map<String, Object>> mapList = new ArrayList<>();
Map<String, Object> map = null; Map<String, Object> map = null;
for (int i = 0; i < bills.size(); i++) { for (int i = 0; i < bills.size(); i++) {
map = new HashMap<>(); map = new HashMap<>();
@ -126,12 +128,14 @@ public class CheckAccountController extends BaseController {
map.put("mdate", mdate); map.put("mdate", mdate);
map.put("flag", "1"); map.put("flag", "1");
map.put("createUser", createUser); map.put("createUser", createUser);
GenerateCheckThread thread = new GenerateCheckThread("1", map, checkMonthService, checkYearService, null, null, companyId); mapList.add(map);
//提交线程池 执行
pool.submit(thread);
bills.remove(i); bills.remove(i);
i = i - 1; i = i - 1;
} }
GenerateCheckThread thread = new GenerateCheckThread("1", mapList, checkMonthService, checkYearService,
null, null, companyId, webSocketName);
//提交线程池 执行
pool.submit(thread);
//关闭线程池 //关闭线程池
pool.shutdown(); pool.shutdown();
return AjaxResult.success(); return AjaxResult.success();
@ -180,6 +184,7 @@ public class CheckAccountController extends BaseController {
@ApiParam(required = false, value = "结束时间") @RequestParam(required = false) String endTime, @ApiParam(required = false, value = "结束时间") @RequestParam(required = false) String endTime,
@ApiParam(required = false, value = "是否为当前销售年度的年对账单 0.是 1.否") @RequestParam(required = false) String yearFlag, @ApiParam(required = false, value = "是否为当前销售年度的年对账单 0.是 1.否") @RequestParam(required = false) String yearFlag,
@ApiParam(required = false, value = "销售年度") @RequestParam(required = false) String saleYear, @ApiParam(required = false, value = "销售年度") @RequestParam(required = false) String saleYear,
@ApiParam(required = false, value = "webSocket客户端名称") @RequestParam(required = false) String webSocketName,
HttpServletRequest request) { HttpServletRequest request) {
String token = request.getHeader("token"); String token = request.getHeader("token");
if (!redisService.isKey(token)) { if (!redisService.isKey(token)) {
@ -219,7 +224,8 @@ public class CheckAccountController extends BaseController {
if (StringUtil.isEmpty(endTime)) { if (StringUtil.isEmpty(endTime)) {
endTime = DateUtil.getDay(); endTime = DateUtil.getDay();
} }
ExecutorService pool = Executors.newFixedThreadPool(4); ExecutorService pool = Executors.newFixedThreadPool(1);
List<Map<String, Object>> mapList = new ArrayList<>();
Map<String, Object> map = null; Map<String, Object> map = null;
for (int i = 0; i < bills.size(); i++) { for (int i = 0; i < bills.size(); i++) {
map = new HashMap<>(); map = new HashMap<>();
@ -229,15 +235,17 @@ public class CheckAccountController extends BaseController {
// 默认为本年度的,期间对账都是本年度的 // 默认为本年度的,期间对账都是本年度的
map.put("yearFlag", StringUtil.isNotEmpty(yearFlag) ? yearFlag : "0"); map.put("yearFlag", StringUtil.isNotEmpty(yearFlag) ? yearFlag : "0");
map.put("saleYear", saleYear); map.put("saleYear", saleYear);
GenerateCheckThread thread = new GenerateCheckThread(flag, map, checkMonthService, checkYearService, startTime, endTime, companyId); mapList.add(map);
//提交线程池 执行
pool.submit(thread);
bills.remove(i); bills.remove(i);
i = i - 1; i = i - 1;
} }
GenerateCheckThread thread = new GenerateCheckThread(flag, mapList, checkMonthService, checkYearService,
startTime, endTime, companyId, webSocketName);
//提交线程池 执行
pool.submit(thread);
//关闭线程池 //关闭线程池
pool.shutdown(); pool.shutdown();
return AjaxResult.success(); return AjaxResult.success("对账已成功发起,请耐心等待执行完成……");
} }
/** /**

View File

@ -1,10 +1,18 @@
package com.yb.lb.webapp.thread.check; package com.yb.lb.webapp.thread.check;
import com.alibaba.fastjson.JSONObject;
import com.yb.lb.common.utils.DateUtil;
import com.yb.lb.common.utils.SpringUtils;
import com.yb.lb.common.utils.StringUtil;
import com.yb.lb.webapp.checkaccount.service.CheckMonthService; import com.yb.lb.webapp.checkaccount.service.CheckMonthService;
import com.yb.lb.webapp.checkaccount.service.CheckYearService; import com.yb.lb.webapp.checkaccount.service.CheckYearService;
import com.yb.lb.webapp.websocket.config.WsConstants;
import com.yb.lb.webapp.websocket.handler.WebSocketHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
@ -14,39 +22,52 @@ import java.util.Map;
**/ **/
public class GenerateCheckThread implements Runnable { public class GenerateCheckThread implements Runnable {
private String type; private String type;
// private List<Map<String, Object>> bills; private List<Map<String, Object>> bills;
private Map<String, Object> map; // private Map<String, Object> map;
private CheckMonthService checkMonthService; private CheckMonthService checkMonthService;
private CheckYearService checkYearService; private CheckYearService checkYearService;
private String startTime; private String startTime;
private String endTime; private String endTime;
private String companyId; private String companyId;
private String webSocketName;
Logger logger = LoggerFactory.getLogger(GenerateCheckThread.class); Logger logger = LoggerFactory.getLogger(GenerateCheckThread.class);
public GenerateCheckThread(String type, Map<String, Object> map, CheckMonthService checkMonthService, CheckYearService checkYearService, public GenerateCheckThread(String type, List<Map<String, Object>> bills, CheckMonthService checkMonthService, CheckYearService checkYearService,
String startTime, String endTime, String companyId) { String startTime, String endTime, String companyId, String webSocketName) {
this.type = type; this.type = type;
this.map = map; this.bills = bills;
this.checkMonthService = checkMonthService; this.checkMonthService = checkMonthService;
this.checkYearService = checkYearService; this.checkYearService = checkYearService;
this.startTime = startTime; this.startTime = startTime;
this.endTime = endTime; this.endTime = endTime;
this.companyId = companyId; this.companyId = companyId;
this.webSocketName = webSocketName;
} }
@Override @Override
public void run() { public void run() {
try { try {
if ("1".equals(type)) { String createTime = DateUtil.getTime();
// 月对账 for (Map<String, Object> map : bills) {
checkMonthService.addMonthBill(map, companyId); if ("1".equals(type)) {
} else if ("2".equals(type) || "3".equals(type)) { // 月对账
// 年对账/期间对账 checkMonthService.addMonthBill(map, companyId);
checkYearService.addYearBill(map, startTime, endTime, companyId); } else if ("2".equals(type) || "3".equals(type)) {
// 年对账/期间对账
checkYearService.addYearBill(map, startTime, endTime, companyId);
}
}
JSONObject reData = new JSONObject();
reData.put("data", null);
reData.put("msg", String.format("您在[%s]发起的对账单已经生成成功,请再次查询一下数据", createTime));
reData.put("code", WsConstants.CODE_RE_3002);
if (StringUtil.isNotEmpty(webSocketName)) {
WebSocketHandler webSocketHandler = SpringUtils.getBean(WebSocketHandler.class);
webSocketHandler.sendMessageToUser(webSocketName, new TextMessage(reData.toJSONString()));
} }
} catch (Exception e) { } catch (Exception e) {
logger.debug("生成对账单:" + e.getMessage()); logger.error("GenerateCheckThread-Exception" + e.getMessage());
e.printStackTrace(); e.printStackTrace();
} }
} }

View File

@ -360,7 +360,7 @@ public class RedisService {
public boolean set(String key,String value, int seconds){ public boolean set(String key,String value, int seconds){
Jedis jedis = this.getJedis(); Jedis jedis = this.getJedis();
jedis.select(15); jedis.select(1);
String result = jedis.set(key, value); String result = jedis.set(key, value);
jedis.expire(key, seconds); jedis.expire(key, seconds);
returnResource(jedis); returnResource(jedis);

View File

@ -49,6 +49,10 @@
<groupId>com.google.code.gson</groupId> <groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId> <artifactId>gson</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -0,0 +1,38 @@
package com.yb.lb.webapp.websocket.config;
import com.yb.lb.webapp.websocket.handler.WebSocketHandler;
import com.yb.lb.webapp.websocket.interceptor.WebSocketHandshakeInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/**
* Spring WebSocket的配置这里采用的是注解的方式
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//1.注册WebSocket
String websocketUrl = "/ws/socketServer"; //设置websocket的地址
registry.addHandler(webSocketHandler(), websocketUrl)
.addInterceptors(new WebSocketHandshakeInterceptor()) //注册Handler
.setAllowedOrigins("*"); //允许跨域
//2.注册SockJS提供SockJS支持(主要是兼容ie8)
String sockjsUrl = "/sockjs/socketServer"; //设置sockjs的地址
registry.addHandler(webSocketHandler(), sockjsUrl). //注册Handler
addInterceptors(new WebSocketHandshakeInterceptor()). //注册Interceptor
withSockJS(); //支持sockjs协议
}
@Bean
public TextWebSocketHandler webSocketHandler() {
return new WebSocketHandler();
}
}

View File

@ -0,0 +1,37 @@
package com.yb.lb.webapp.websocket.config;
/**
* WebSocket常量信息
*
* @author mzr
* @date 2024/9/3
*/
public class WsConstants {
//心跳包
public static final String CODE_1000 = "1000";
//心跳包返回消息
public static final String CODE_RE_1000 = "re1000";
//连接成功
public static final String CODE_1111 = "1111";
//错误消息
public static final String CODE_0000 = "0000";
//握手消息
public static final String CODE_2000 = "2000";
//握手数据包返回消息
public static final String CODE_RE_2000 = "re2000";
//PC端申请打包消息
public static final String CODE_3001 = "3001";
//PC端申请打包服务器返回消息
public static final String CODE_RE_3001 = "re3001";
//服务器打包需要下载的数据包后通知PC客户端消息
public static final String CODE_3002 = "3002";
//服务器打包需要下载的数据包后通知PC客户端返回消息
public static final String CODE_RE_3002 = "re3002";
}

View File

@ -0,0 +1,209 @@
package com.yb.lb.webapp.websocket.handler;
import com.alibaba.fastjson.JSONObject;
import com.yb.lb.common.utils.SpringUtils;
import com.yb.lb.webapp.redis.service.RedisService;
import com.yb.lb.webapp.websocket.config.WsConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* Websocket处理器
*/
public class WebSocketHandler extends TextWebSocketHandler {
private final static Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
//已建立连接的用户
public static final ArrayList<WebSocketSession> users = new ArrayList<>();
/**
* 处理前端发送的文本信息
* js调用websocket.send时候会调用该方法
*
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userName = (String) session.getAttributes().get("WEBSOCKET_USERNAME");
// 获取提交过来的消息详情
// logger.info("收到用户 " + userName + "的消息:" + message.toString());
String payLoad = message.getPayload();
if (payLoad.length() > 10) {
// json字符串转换为map
Map messageMap = (Map) JSONObject.parse(message.getPayload());
// logger.info("收到用户 " + userName + "的消息:" + message);
//解析
if (messageMap.containsKey("code")) {
String msgType = messageMap.get("code").toString();
switch (msgType) {
case WsConstants.CODE_2000: {
//PC端握手消息
//返回握手成功消息
Map<String, Object> reData = new HashMap<>();
reData.put("data", "userId");
reData.put("msg", "应答握手数据包");
reData.put("code", WsConstants.CODE_RE_2000);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
}
break;
case WsConstants.CODE_3001: {
//PC端申请打包
// ExecutorService pool = Executors.newFixedThreadPool(1);
// FilesToZipThread filesToZipThread = new FilesToZipThread(session, fileInfoService, map, recordSubmitCheckService, recordCheckDetailService, miFaultInfoService, fixedSubmitCheckService, overallSubmitCheckService, rectificationService, repairSubmitCheckService, bizTodoItemService, taskService);
// pool.submit(filesToZipThread);
// pool.shutdown();
//启动打包程序打包后给该用户发送消息
Map<String, Object> reData = new HashMap<>();
reData.put("data", null);
reData.put("msg", "回复申请打包");
reData.put("code", WsConstants.CODE_RE_3001);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
}
break;
default: {
Map<String, Object> reData = new HashMap<>();
reData.put("data", userName);
reData.put("msg", "错误数据包");
reData.put("code", WsConstants.CODE_0000);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
}
}
} else {
//传过来的错误数据包后需要返回错误数据回复
Map<String, Object> reData = new HashMap<>();
reData.put("data", userName);
reData.put("msg", "错误数据包");
reData.put("code", WsConstants.CODE_0000);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
}
} else if (("heart").equals(payLoad)) {
Map<String, Object> reData = new HashMap<>();
reData.put("data", userName);
reData.put("msg", "前端保持心跳");
reData.put("code", WsConstants.CODE_RE_1000);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
} else {
//传过来的错误数据包后需要返回错误数据回复
Map<String, Object> reData = new HashMap<>();
reData.put("data", userName);
reData.put("msg", "错误数据包");
reData.put("code", WsConstants.CODE_0000);
session.sendMessage(new TextMessage(JSONObject.toJSONString(reData)));
}
}
/**
* 当新连接建立的时候被调用
* 连接成功时候会触发页面上onOpen方法
*
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
users.add(session);
String userName = (String) session.getAttributes().get("WEBSOCKET_USERNAME");
Map<String, Object> data = new HashMap<>();
data.put("data", userName);
data.put("msg", "SUCCESS");
data.put("code", WsConstants.CODE_1111);
data.put("msgType", "uuid");
session.sendMessage(new TextMessage(JSONObject.toJSONString(data)));
}
/**
* 当连接关闭时被调用
*
* @param session
* @param status
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String userName = (String) session.getAttributes().get("WEBSOCKET_USERNAME");
logger.info("用户 " + userName + " Connection closed. Status: " + status);
users.remove(session);
}
/**
* 传输错误时调用
*
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
String userName = (String) session.getAttributes().get("WEBSOCKET_USERNAME");
if (session.isOpen()) {
session.close();
}
logger.debug("用户: " + userName + " websocket connection closed......");
users.remove(session);
}
/**
* 给所有在线用户发送消息
*
* @param message
*/
public void sendMessageToUsers(TextMessage message) {
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
System.out.println("FA:" + user.getAttributes().get("WEBSOCKET_USERNAME"));
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
System.out.println("FA:==================================================");
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 给某个用户发送消息
*
* @param userName
* @param message
*/
public void sendMessageToUser(String userName, TextMessage message) {
for (WebSocketSession user : users) {
String userName1 = (String) user.getAttributes().get("WEBSOCKET_USERNAME");
// System.out.println("userName1 = " + userName1);
if (userName1.equals(userName)) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
logger.error("sendMessageToUser-error:" + e.getMessage());
}
break;
}
}
}
}

View File

@ -0,0 +1,35 @@
package com.yb.lb.webapp.websocket.interceptor;
import com.yb.lb.common.utils.UuidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* WebSocket握手拦截器
*/
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
private final static Logger LOGGER = LoggerFactory.getLogger(WebSocketHandshakeInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
/*
* 每当有人连接的时候就给这个人创建一个名字
*/
String userName = UuidUtils.get();
//给客户端创建名字
attributes.put("WEBSOCKET_USERNAME", userName);
LOGGER.debug("WebSocket服务器端获得一个新的客户" + userName);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
LOGGER.debug("After Handshake");
}
}