remainData.go 8.1 KB


  1. package service
  2. import (
  3. "bufio"
  4. "designs/config"
  5. "designs/global"
  6. "designs/model"
  7. "designs/utils"
  8. "fmt"
  9. "github.com/pkg/errors"
  10. "math"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "time"
  16. )
  17. func RemainDataBydDay(types int, pf string, gid string, startTime string, endTime string) (map[string]map[string]interface{}, error) {
  18. var err error
  19. start := time.Now()
  20. //对于传过来的endTime ,做一个处理
  21. t, _ := time.Parse("2006-01-02", endTime)
  22. endTimeData := t.AddDate(0, 0, 1).Format("2006-01-02")
  23. UsersBydDay := utils.GetTimeDayDate(startTime, endTime) //用户分别是在哪天注册的
  24. UserLoginBydDay := utils.GetTimeDayDate(startTime, endTime) //用户分别是在哪天注册的
  25. var UsersId []int
  26. userIdMap := make(map[int]bool)
  27. if types == 1 {
  28. //先计算出这个时间内 ,每天的注册用户数量
  29. var users []model.User
  30. err = global.App.DB.Table("user").
  31. Where("pf", pf).Where("gid", gid).
  32. Where("createdAt", ">=", startTime).
  33. Where("createdAt", "<=", endTimeData).
  34. Select("userId", "createdAt").
  35. Scan(&users).Error
  36. if err != nil {
  37. return nil, err
  38. }
  39. fmt.Printf("步骤2耗时: %v\n", time.Since(start))
  40. for _, user := range users {
  41. userIdMap[user.UserId] = true
  42. UsersBydDay[user.CreatedAt.Format("2006-01-02")] = append(UsersBydDay[user.CreatedAt.Format("2006-01-02")], user.UserId)
  43. }
  44. for user := range userIdMap {
  45. UsersId = append(UsersId, user)
  46. }
  47. users = nil //用完后清空内存
  48. } else if types == 2 {
  49. var users []model.UserLogin
  50. err = global.App.DB.Table("user_login").
  51. Where("pf", pf).Where("gid", gid).
  52. Where("loginTime", ">=", startTime).
  53. Where("loginTime", "<=", endTimeData).
  54. Group("userId").
  55. Scan(&users).Error
  56. if err != nil {
  57. return nil, err
  58. }
  59. for _, user := range users {
  60. UsersId = append(UsersId, user.UserId)
  61. UsersBydDay[user.LoginTime.Format("2006-01-02")] = append(UsersBydDay[user.LoginTime.Format("2006-01-02")], user.UserId)
  62. }
  63. } else {
  64. return nil, errors.New("type 参数错误")
  65. }
  66. //把每天的注册用户进行集合,查出后面所有天数的活跃情况
  67. var UserLogin []model.UserOnline
  68. //err = global.App.DB.Table("user_online_copy1 as user_online").
  69. // Where("pf", pf).Where("gid", gid).
  70. // Raw("USE INDEX (DATE)").
  71. // WhereIn("userId", UsersId).
  72. // Where("logTime", ">=", startTime).
  73. // Select("logTime", "user_online.userId").
  74. // Group("date,user_online.userId").
  75. // Scan(&UserLogin).Error
  76. //if err != nil {
  77. // return nil, err
  78. //}
  79. // 使用 Raw 方法执行带有 FORCE INDEX 的 SQL 查询
  80. query := `
  81. SELECT
  82. logTime,
  83. user_online.userId
  84. FROM
  85. user_online AS user_online USE INDEX (date)
  86. INNER JOIN (
  87. SELECT userId
  88. FROM user
  89. WHERE pf = ?
  90. AND gid = ?
  91. AND createdAt >= ?
  92. AND createdAt <= ?
  93. ) AS t ON t.userId = user_online.userId
  94. WHERE
  95. user_online.pf = ?
  96. AND user_online.gid = ?
  97. AND DATE(logTime) >= ?
  98. GROUP BY
  99. date,
  100. user_online.userId;
  101. `
  102. err = global.App.DB.Raw(query, pf, gid, startTime, endTime, pf, gid, startTime).Scan(&UserLogin).Error
  103. if err != nil {
  104. return nil, err
  105. }
  106. fmt.Printf("步骤3耗时: %v\n", time.Since(start))
  107. //对这些数据进行整理
  108. for _, v := range UserLogin {
  109. //根据天进行分组,得出总共有多少
  110. times := v.LogTime.Format("2006-01-02")
  111. UserLoginBydDay[times] = append(UserLoginBydDay[times], v.UserId)
  112. }
  113. res := make(map[string]map[string]interface{})
  114. //逐天比较注册的数据和留存的数据,得出每天的活跃比例和人数
  115. for day, v := range UsersBydDay {
  116. remain := make(map[string]interface{})
  117. remain["count"] = len(v)
  118. remain["date"] = day
  119. //分别比较每一天的数据
  120. for dayDate, loginDay := range UserLoginBydDay {
  121. ok, remainDays := utils.CompareDates(dayDate, day)
  122. if !ok {
  123. continue
  124. }
  125. remain[remainDays] = math.Round(utils.IntersectionRate(v, loginDay)*100*100) / 100
  126. }
  127. res[day] = remain
  128. }
  129. fmt.Printf("步骤4耗时: %v\n", time.Since(start))
  130. return res, nil
  131. }
  132. func RemainDataBydDayNew(types int, pf string, gid string, startTime string, endTime string) (map[string]map[string]interface{}, error) {
  133. //目前只有types = 1
  134. //对于传过来的endTime ,做一个处理
  135. t, _ := time.Parse("2006-01-02", endTime)
  136. endTimeData := t.AddDate(0, 0, 1).Format("2006-01-02")
  137. UsersBydDay := utils.GetTimeDayDate(startTime, endTime) //用户分别是在哪天注册的
  138. //先计算出这个时间内 ,每天的注册用户数量
  139. var users []model.User
  140. err := global.App.DB.Table("user").
  141. Where("pf", pf).Where("gid", gid).
  142. Where("createdAt", ">=", startTime).
  143. Where("createdAt", "<=", endTimeData).
  144. Select("userId", "createdAt").
  145. Scan(&users).Error
  146. if err != nil {
  147. return nil, err
  148. }
  149. var UsersId []int
  150. userIdMap := make(map[int]bool)
  151. for _, user := range users {
  152. userIdMap[user.UserId] = true
  153. UsersBydDay[user.CreatedAt.Format("2006-01-02")] = append(UsersBydDay[user.CreatedAt.Format("2006-01-02")], user.UserId)
  154. }
  155. for user := range userIdMap {
  156. UsersId = append(UsersId, user)
  157. }
  158. UserLoginBydDay := utils.GetTimeDayDate(startTime, endTime) //用户分别是在哪天注册的
  159. for k, _ := range UserLoginBydDay {
  160. //根据pf,gid,startTime,endTime 取出数据
  161. UserLoginBydDay[k] = GetLocalActiveLog(gid, pf, k)
  162. }
  163. res := make(map[string]map[string]interface{})
  164. for day, v := range UsersBydDay {
  165. remain := make(map[string]interface{})
  166. remain["count"] = len(v)
  167. remain["date"] = day
  168. //分别比较每一天的数据
  169. for dayDate, loginDay := range UserLoginBydDay {
  170. ok, remainDays := utils.CompareDates(dayDate, day)
  171. if !ok {
  172. continue
  173. }
  174. remain[remainDays] = math.Round(utils.IntersectionRate(v, loginDay)*100*100) / 100
  175. }
  176. res[day] = remain
  177. }
  178. return res, nil
  179. }
  180. func OnlineToFile() {
  181. //分批读取数据
  182. now := time.Now()
  183. for i := 1; i <= 45; i++ {
  184. var onlineData []model.UserOnline
  185. date := now.AddDate(0, 0, -i).Format("20060102")
  186. global.App.DB.Table("user_online_old").Where("date", date).Scan(&onlineData)
  187. //根据gid和pf,分文件存储
  188. for _, v := range onlineData {
  189. SetLocalActiveLog(v.Gid, v.Pf, "", v.UserId, v.Type, v.LogTime)
  190. }
  191. fmt.Println(date, "数据归档完成,耗时:", time.Since(now))
  192. }
  193. //var onlineData []model.UserOnline
  194. //
  195. //date := "20250423"
  196. //
  197. //global.App.DB.Table("user_online").Where("date", date).Scan(&onlineData)
  198. //
  199. ////根据gid和pf,分文件存储
  200. //for _, v := range onlineData {
  201. // SetLocalActiveLog(v.Gid, v.Pf, "", v.UserId, v.Type, v.LogTime)
  202. //}
  203. }
  204. func SetLocalActiveLog(gid string, pf string, openId string, userId int, types int, now time.Time) error {
  205. // 生成日期目录路径(示例:./data/2024-06-15)
  206. dateDir := now.Format("2006-01-02")
  207. dirPath := filepath.Join("storage", dateDir)
  208. // 创建日期目录(若不存在)
  209. if err := os.MkdirAll(dirPath, 0755); err != nil {
  210. return err
  211. }
  212. // 生成文件名(示例:g123_wechat.txt)
  213. fileName := fmt.Sprintf("%s_%s.txt", gid, pf)
  214. filePath := filepath.Join(dirPath, fileName)
  215. // 追加写入文件
  216. file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  217. if err != nil {
  218. return fmt.Errorf("failed to open file: %v", err)
  219. }
  220. defer file.Close()
  221. // 写入数据(示例:1584,oXyZ123456\n)
  222. if _, err := fmt.Fprintf(file, "%d,%d,%s\n", userId, types, now.Format("2006-01-02 15:04:05")); err != nil {
  223. return fmt.Errorf("failed to write file: %v", err)
  224. }
  225. return nil
  226. }
  227. func GetLocalActiveLog(gid string, pf string, date string) []int {
  228. var dir string
  229. if config.Get("app.local") == "local" {
  230. //url = "mongodb://localhost:27017"
  231. dir = "storage"
  232. } else {
  233. dir = "/www/wwwroot/chunhao_receive/storage"
  234. }
  235. dirPath := filepath.Join(dir, date)
  236. fileName := fmt.Sprintf("%s_%s.txt", gid, pf)
  237. filePath := filepath.Join(dirPath, fileName)
  238. file, _ := os.Open(filePath)
  239. defer file.Close() // 确保函数结束时关闭文件
  240. // 创建 Scanner 对象
  241. scanner := bufio.NewScanner(file)
  242. var res []int
  243. // 逐行读取文件内容
  244. lineNumber := 1
  245. for scanner.Scan() {
  246. line := scanner.Text() // 获取当前行内容
  247. lineNumber++
  248. userId, _ := strconv.Atoi(strings.Split(line, ",")[0])
  249. if !utils.InArray(userId, res) {
  250. res = append(res, userId)
  251. }
  252. }
  253. return res
  254. }