Data Streams SDK (Go)

Data Streams SDKs

Choose the SDK version that matches your needs.

This documentation provides a detailed reference for the Data Streams SDK for Go. It implements a client library that offers a domain-oriented abstraction for interacting with the Data Streams API and enables both point-in-time data retrieval and real-time data streaming with built-in fault tolerance capabilities.

Requirements

  • Go 1.22.4 or later
  • Valid Chainlink Data Streams credentials

Installation

go get github.com/smartcontractkit/data-streams-sdk/go

streams

Import

import streams "github.com/smartcontractkit/data-streams-sdk/go"

Types

Client interface

Interface for interacting with the Data Streams API. Use the New function to create a new instance of the client.

Interface Methods
  • GetFeeds: Lists all streams available to you.

    GetFeeds(ctx context.Context) (r []*feed.Feed, err error)
    
  • GetLatestReport: Fetches the latest report available for the specified FeedID.

    GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)
    
  • GetReports: Fetches reports for the specified stream IDs and a given timestamp.

    GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)
    
  • GetReportPage: Paginates the reports for a specified FeedID starting from a given timestamp.

    GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)
    
  • Stream: Creates a real-time report stream for specified stream IDs.

    Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)
    
  • StreamWithStatusCallback: Creates a real-time report stream for specified stream IDs with a callback function for connection status updates.

    StreamWithStatusCallback(ctx context.Context, feedIDs []feed.ID, connStatusCallback func(isConnected bool, host string, origin string)) (Stream, error)
    

Config struct

Configuration struct for the client. Config specifies the client configuration and dependencies. If you specify the Logger function, informational client activity is logged.

type Config struct {
	ApiKey             string                        // Client API key
	ApiSecret          string                        // Client API secret
	RestURL            string                        // Rest API URL
	WsURL              string                        // Websocket API URL
	WsHA               bool                          // Use concurrent connections to multiple Streams servers
	WsMaxReconnect     int                           // Maximum number of reconnection attempts for Stream underlying connections
	LogDebug           bool                          // Log debug information
	InsecureSkipVerify bool                          // Skip server certificate chain and host name verification
	Logger             func(format string, a ...any) // Logger function

	// InspectHttpResponse intercepts http responses for rest requests.
	// The response object must not be modified.
	InspectHttpResponse func(*http.Response)
}

Note: If WsMaxReconnect is not specified, it defaults to 5 attempts.

CtxKey string

Custom context key type used for passing additional headers.

type CtxKey string
  • Constants:

    • CustomHeadersCtxKey: Key for passing custom HTTP headers.
    const (
      // CustomHeadersCtxKey is used as key in the context.Context object
      // to pass in a custom http headers in a http.Header to be used by the client.
      // Custom header values will overwrite client headers if they have the same key.
      CustomHeadersCtxKey CtxKey = "CustomHeaders"
    )
    

ReportPage struct

Represents a paginated response of reports.

type ReportPage struct {
	Reports    []*ReportResponse // Slice of ReportResponse, representing individual report entries.
	NextPageTS uint64            // Timestamp for the next page, used for fetching subsequent pages.
}

ReportResponse struct

Implements the report envelope that contains the full report payload, its stream ID and timestamps. Use the Decode function to decode the report payload.

type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}
Methods
  • MarshalJSON: Serializes the ReportResponse into JSON.

    func (r *ReportResponse) MarshalJSON() ([]byte, error)
    
  • String: Returns the string representation of the ReportResponse.

    func (r *ReportResponse) String() (s string)
    
  • UnmarshalJSON: Deserializes the ReportResponse from JSON.

    func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
    

Stats struct

Statistics related to the Stream's operational metrics.

type Stats struct {
	Accepted              uint64 // Total number of accepted reports
	Deduplicated          uint64 // Total number of deduplicated reports when in HA
	TotalReceived         uint64 // Total number of received reports
	PartialReconnects     uint64 // Total number of partial reconnects when in HA
	FullReconnects        uint64 // Total number of full reconnects
	ConfiguredConnections uint64 // Number of configured connections if in HA
	ActiveConnections     uint64 // Current number of active connections
}
Methods
  • String: Returns a string representation of the Stats.

    func (s Stats) String() (st string)
    

Stream interface

Interface for managing a real-time data stream.

Interface Methods
  • Read: Reads the next available report from the stream. This method will pause (block) the execution until one of the following occurs:

    • A report is successfully received.
    • The provided context is canceled, typically due to a timeout or a cancel signal.
    • An error state is encountered in any of the underlying connections, such as a network failure.
    Read(context.Context) (*ReportResponse, error)
    
  • Stats: Returns statistics about the stream operations as Stats.

    Stats() Stats
    
  • Close: Closes the stream.

    Close() error
    

High Availability (HA) Mode

The Data Streams SDK supports High Availability mode for WebSocket connections. When enabled with WsHA: true, the SDK maintains concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.

HA Mode Behavior

When high availability mode is enabled:

  • The client queries the server for available origins using the X-Cll-Available-Origins header
  • Multiple WebSocket connections are established to different server instances
  • Reports are deduplicated when received across connections
  • Partial reconnects occur when some connections fail while others remain active
  • Full reconnects occur when all connections fail

HA Configuration

HA mode is controlled by the WsHA field in the client configuration:

config := streams.Config{
    WsHA: true, // Use concurrent connections to multiple Streams servers
}

Functions

New

Creates a new client instance with the specified configuration.

func New(cfg Config) (c Client, err error)

Note: New does not initialize any connections to the Data Streams service.

LogPrintf

Utility function for logging.

func LogPrintf(format string, a ...any)

Errors

ErrStreamClosed

Error returned when attempting to use a closed stream.

var ErrStreamClosed = fmt.Errorf("client: use of closed Stream")

feed

Import

import feed "github.com/smartcontractkit/data-streams-sdk/go/feed"

Types

Feed struct

Identifies the report stream ID.

type Feed struct {
    FeedID ID `json:"feedID"`
}

Where ID is the unique identifier for the stream.

FeedVersion uint16

Represents the stream report schema version.

type FeedVersion uint16
Constants
  • FeedVersion1: Version 1 schema
  • FeedVersion2: Version 2 schema
  • FeedVersion3: Version 3 schema
  • FeedVersion4: Version 4 schema
  • FeedVersion5: Version 5 schema
  • FeedVersion6: Version 6 schema
  • FeedVersion7: Version 7 schema
  • FeedVersion8: Version 8 schema
  • FeedVersion9: Version 9 schema
  • FeedVersion10: Version 10 schema

ID [32]byte

Represents a unique identifier for a stream.

type ID [32]byte
Methods
  • FromString: Converts a string into an ID.

    func (f *ID) FromString(s string) (err error)
    
  • MarshalJSON: Serializes the ID into JSON.

    func (f *ID) MarshalJSON() (b []byte, err error)
    
  • String: Returns the string representation of the ID.

    func (f *ID) String() (id string)
    
  • UnmarshalJSON: Deserializes the ID from JSON.

    func (f *ID) UnmarshalJSON(b []byte) (err error)
    
  • Version: Returns the version of the stream.

    func (f *ID) Version() FeedVersion
    

report

Import

import report "github.com/smartcontractkit/data-streams-sdk/go/report"

Types

Data interface

Interface that represents the actual report data and attributes.

type Data interface {
    v1.Data | v2.Data | v3.Data | v4.Data | v5.Data | v6.Data | v7.Data | v8.Data | v9.Data | v10.Data
    Schema() abi.Arguments
}

Report struct

Represents the report envelope that contains the full report payload, its Feed ID, and timestamps.

type Report[T Data] struct {
    Data          T
    ReportContext [3][32]byte
    ReportBlob    []byte
    RawRs         [][32]byte
    RawSs         [][32]byte
    RawVs         [32]byte
}

Functions

Decode

Decodes the report serialized bytes and its data.

func Decode[T Data](fullReport []byte) (r *Report[T], err error)

Example:

payload, _ := hex.DecodeString(
    `0006bd87830d5f336e205cf5c63329a1dab8f5d56812eaeb7c69300e66ab8e22000000000000000000000000000000000000000000000000000000000cf7ed13000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e0000000000000000000000000000000000000000000000000000000000000022000000000000000000000000000000000000000000000000000000000000003000101000101000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012000030ab7d02fbba9c6304f98824524407b1f494741174320cfd17a2c22eec1de0000000000000000000000000000000000000000000000000000000066a8f5c60000000000000000000000000000000000000000000000000000000066a8f5c6000000000000000000000000000000000000000000000000000057810653dd9000000000000000000000000000000000000000000000000000541315da76d6100000000000000000000000000000000000000000000000000000000066aa474600000000000000000000000000000000000000000000000009a697ee4230350400000000000000000000000000000000000000000000000009a6506d1426d00000000000000000000000000000000000000000000000000009a77d03ae355fe0000000000000000000000000000000000000000000000000000000000000000672bac991f5233df89f581dc02a89dd8d48419e3558b247d3e65f4069fa45c36658a5a4820dc94fc47a88a21d83474c29ee38382c46b6f9a575b9ce8be4e689c03c76fac19fbec4a29dba704c72cc003a6be1f96af115e322321f0688e24720a5d9bd7136a1d96842ec89133058b888b2e6572b5d4114de2426195e038f1c9a5ce50016b6f5a5de07e08529b845e1c622dcbefa0cfa2ffd128e9932ecee8efd869bc56d09a50ceb360a8d366cfa8eefe3f64279c88bdbc887560efa9944238eb000000000000000000000000000000000000000000000000000000000000000060e2a800f169f26164533c7faff6c9073cd6db240d89444d3487113232f9c31422a0993bb47d56807d0dc26728e4c8424bb9db77511001904353f1022168723010c46627c890be6e701e766679600696866c888ec80e7dbd428f5162a24f2d8262f846bdb06d9e46d295dd8e896fb232be80534b0041660fe4450a7ede9bc3b230722381773a4ae81241568867a759f53c2bdd05d32b209e78845fc58203949e50a608942b270c456001e578227ad00861cf5f47b27b09137a0c4b7f8b4746cef`)

report, err := report.Decode[v3.Data](payload)
if err != nil {
    streams.LogPrintf("error decoding report: %s", err)
    os.Exit(1)
}

streams.LogPrintf(
    "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
    report.Data.FeedID.String(),
    report.Data.FeedID.Version(),
    report.Data.Bid.String(),
    report.Data.Ask.String(),
    report.Data.BenchmarkPrice.String(),
    report.Data.LinkFee.String(),
    report.Data.NativeFee.String(),
    report.Data.ValidFromTimestamp,
    report.Data.ExpiresAt,
)

Report Format and Schema Versions

Schema Versions

The SDK supports multiple report schema versions (v1-v10), each optimized for different use cases:

  • v2: Feed IDs starting with 0x0002
  • v3: Feed IDs starting with 0x0003 (Crypto Streams)
  • v4: Feed IDs starting with 0x0004 (Real-World Assets)
  • v5: Feed IDs starting with 0x0005
  • v6: Feed IDs starting with 0x0006 (Multiple Price Values)
  • v7: Feed IDs starting with 0x0007 (Exchange Rate)
  • v8: Feed IDs starting with 0x0008 (Non-OTC RWA)
  • v9: Feed IDs starting with 0x0009 (NAV Fund Data)
  • v10: Feed IDs starting with 0x000a (Tokenized Equity)

Common Fields

All report versions include standard metadata:

  • FeedID: Unique identifier for the data stream
  • ValidFromTimestamp: When the report becomes valid
  • ExpiresAt: When the report expires
  • LinkFee: Fee in LINK tokens
  • NativeFee: Fee in native blockchain currency
  • ObservationsTimestamp: When the data was observed

Usage Pattern

Each version follows the same decoding pattern:

import v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"

report, err := report.Decode[v3.Data](fullReport)

Use the appropriate version import based on your feed's schema version, which can be determined by calling feedID.Version().

For complete field definitions and decoding examples, see the report schema overview and fetch tutorial.

What's next

Get the latest Chainlink content straight to your inbox.