lucas 1 week ago
parent
commit
8c11b2870e
  1. 5
      consumers/consumerZWYHBJCAS.go
  2. 38
      dbOperate/httpHelper.go

5
consumers/consumerZWYHBJCAS.go

@ -75,6 +75,7 @@ func (the *consumerZWYHBJCAS) InputInitial() error {
the.ch = make(chan []adaptors.NeedPush, 200)
//数据入口
the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""}
the.InHttp.InitialWithTls()
the.InKafka = _kafka.KafkaHelper{
Brokers: the.Info.IoConfig.In.Kafka.Brokers,
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
@ -184,7 +185,7 @@ func (the *consumerZWYHBJCAS) getEs1HourAggData() {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
esAggResultStr := the.InHttp.HttpGetWithHeaderWithTLS(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
@ -212,7 +213,7 @@ func (the *consumerZWYHBJCAS) getEs10minAggData() {
for _, factorId := range factorIds {
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
esAggResultStr := the.InHttp.HttpGetWithHeaderWithTLS(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo

38
dbOperate/httpHelper.go

@ -32,6 +32,14 @@ func (the *HttpHelper) SetAuth(userName, passWord string) {
the.userName = userName
the.passWord = passWord
}
func (the *HttpHelper) InitialWithTls() {
the.client = http.Client{}
//取消证书校验,避免证书过期问题
the.client.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
the.client.Timeout = time.Minute
}
func (the *HttpHelper) HttpGet(queryBody string) string {
client := the.client
req, err := http.NewRequest("GET", the.Url, strings.NewReader(queryBody))
@ -48,6 +56,34 @@ func (the *HttpHelper) HttpGet(queryBody string) string {
body, err := io.ReadAll(resp.Body)
return string(body)
}
func (the *HttpHelper) HttpGetWithHeaderWithTLS(queryBody string, headerMap map[string]string) string {
url := the.Url
client := the.client
req, err := http.NewRequest("GET", url, strings.NewReader(queryBody))
req.Header.Set("Content-Type", "application/json")
for k, v := range headerMap {
req.Header.Set(k, v)
}
resp, err := client.Do(req)
if err != nil {
log.Printf("查询esproxy 异常=>%s", err.Error())
return ""
}
defer func(resp *http.Response) {
if resp != nil && resp.Body != nil {
errClose := resp.Body.Close()
if errClose != nil {
log.Printf("关闭异常")
}
}
}(resp)
log.Println("http get 请求,url", url, " <- code=", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
return string(body)
}
func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string]string) string {
url := the.Url
client := the.client
@ -57,6 +93,7 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string]
for k, v := range headerMap {
req.Header.Set(k, v)
}
//取消证书校验,避免证书过期问题
client.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
@ -77,6 +114,7 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string]
log.Printf("http get 请求,url=%s, -> code=%d,respBody=%s", url, resp.StatusCode, body)
return string(body)
}
func (the *HttpHelper) Publish(messageBytes []byte) (string, error) {
url := the.Url
client := the.client

Loading…
Cancel
Save