The server sends Transfer-Encoding: chunked
data, and you want to work with the data as you get it, instead of waiting for the server to finish, the EOF to fire, and then process the data?
Here’s an example curl
of the kind of session I’m talking about:
➜ curl --verbose --location 'http://localhost:8088/query' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
--data-raw '{
"ksql": "SELECT NAME, TS, CAPACITY, EMPTY_PLACES FROM CARPARK_EVENTS WHERE EMPTY_PLACES > 100 emit changes;"
}'
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8088 (#0)
> POST /query HTTP/1.1
> Host: localhost:8088
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/vnd.ksql.v1+json; charset=utf-8
> Content-Length: 118
>
* upload completely sent off: 118 out of 118 bytes
< HTTP/1.1 200 OK
< content-type: application/json
< Transfer-Encoding: chunked
<
The API that I’m working with sends a complete JSON message, but spread over chunks. It starts with a header
[{"header":{"queryId":"none","schema":"`NAME` STRING, `TS` BIGINT, `CAPACITY` INTEGER, `EMPTY_PLACES` INTEGER"}},
and then at some point - perhaps straight away, perhaps after a few seconds, you get some data
{"row":{"columns":["Westgate",1595372100000,116,116]}},
{"row":{"columns":["Burnett St",1595372100000,122,117]}},
and then some empty rows
and then maybe some more data
{"row":{"columns":["Crown Court",1595372100000,142,130]}},
{"row":{"columns":["Leisure Exchange",1595372100000,996,976]}},
This is from a streaming database, and the idea is that the client can use the data as it’s continually sent. Contrast this to the standard request-response pattern of data consumption in which the request is fully satisfied before the client will process the response.
From my Googling I came across two standard patterns for consuming JSON from a REST call:
-
NewDecoder
json.NewDecoder(resp.Body).Decode(&m)
-
Unmarshal
json.Unmarshal(resp.Body, &m)
But I found that both of these blocked until the entire response had been received - which is not what I wanted. Courtesy of chourobin
I found this solution. First up, create the client and request:
// Prepare the request
url := "http://localhost:8088/query"
method := "POST"
k := "SELECT NAME, TS, CAPACITY, EMPTY_PLACES FROM CARPARK_EVENTS WHERE EMPTY_PLACES > " + strconv.Itoa(c) + " EMIT CHANGES;"
payload := strings.NewReader("{\"ksql\":\"" + k + "\"}")
// Create the client
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/vnd.ksql.v1+json; charset=utf-8")
// Make the request
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
Now create a NewReader
to consume the response:
reader := bufio.NewReader(res.Body)
And then run a loop which consumes the response a line at a time:
doThis := true
for doThis {
// Read the next chunk
lb, err := reader.ReadBytes('\n')
if err != nil {
// Got an error back (e.g. EOF), so exit the loop
doThis = false
} else {
// Do stuff with the response here
fmt.Printf("\nGot some data:\n\t%v", string(lb))
}
}
What about the JSON? #
If you notice the example response shown above, the chunks are not self-contained JSON.
-
The header chunk opens an array:
[{"header":{"queryId":"none","schema":"`NAME` STRING, `TS` BIGINT, `CAPACITY` INTEGER, `EMPTY_PLACES` INTEGER"}},
-
Each row chunk is an array entry with trailing comma
{"row":{"columns":["Westgate",1595372100000,116,116]}},
The inbound stream of Bytes is split into lines using reader.ReadBytes('\n')
. This function takes a single byte as the token by which to split, but instead of splitting on \n
(ASCII 13) alone, we actually want to split on ,\r\n
(ASCII 44, 10, 13) since we have the trailing comma to remove, and the CRLF as the delineator.
Now, I think the proper option here is to use a Scanner
but for a quick win I instead did a dirty thing and just truncated slice by two bytes 🤢 (the first byte being \n
which had already been removed by the ReadBytes
function)
if len(lb) > 2 {
lb = lb[:len(lb)-2]
You can then take the slice of bytes and marshall the JSON into a Go variable. You need to declare this first, using a custom type—defining the type is easy using this handy little tool, into which you paste some sample JSON and it spits out the Go type defintion:
So taking this Go code:
type ksqlDBMessageRow struct {
Row struct {
Columns []interface{} `json:"columns"`
} `json:"row"`
}
you declare the variable into which you’ll store the row that’s been read:
var r ksqlDBMessageRow
// …
if strings.Contains(string(lb), "row") {
// Looks like a Row, let's process it!
err = json.Unmarshal(lb, &r)
if err != nil {
fmt.Printf("Error decoding JSON %v (%v)\n", string(lb), err)
}
}
From that you can then access the actual values in the payload itself:
if r.Row.Columns != nil {
CARPARK = r.Row.Columns[0].(string)
DATA_TS = r.Row.Columns[1].(float64)
CURRENT_EMPTY_PLACES = r.Row.Columns[2].(float64)
CAPACITY = r.Row.Columns[3].(float64)
// Handle the timestamp
t := int64(DATA_TS)
ts := time.Unix(t/1000, 0)
fmt.Printf("Carpark %v at %v has %v spaces available (capacity %v)\n", CARPARK, ts, CURRENT_EMPTY_PLACES, CAPACITY)
}