Product Help

Client Package Methods and Functions

Execute

func (job *GraphBatchJob) Execute(ctx context.Context) error

Execute runs all commands in the RelationalBatchJob within a Neo4j session.

It iterates over the commands in the RelationalBatchJob and executes them in the configured access mode (read or write) for the specified database. Each command's Cypher query and parameters are executed using Neo4j transactions. Upon success, the command's OnSuccess handler is invoked with the resulting records. In the event of failure, the command's OnFailure handler is invoked, and the function returns the encountered error.

The function ensures proper resource management by opening and closing a session within the context provided.

Parameters:

  • ctx: The context for managing the lifecycle of the session and transactions.

Returns:

  • error: Returns an error if any command fails during execution. The error includes details about which command failed.

Usage Example:

job := &RelationalBatchJob{ Commands: []command.CommandInterface{cmd1, cmd2}, Database: "exampleDB", GraphClient: neo4jClient, AccessMode: neo4j.AccessModeWrite, } err := job.Execute(context.Background()) if err != nil { log.Fatalf("Batch execution failed: %v", err) }

NewNeo4jClient

func NewNeo4jClient(ctx context.Context, uri, username, password string) (*Neo4jClient, error)

NewNeo4jClient creates a new Neo4jClient

Parameters:

  • ctx: The context for managing the lifecycle of the client.

  • uri: The URI of the Neo4j database.

  • username: The username for authentication.

  • password: The password for authentication. Returns:

  • A pointer to the created Neo4jClient.

  • An error if the client creation fails.

Close

func (c *Neo4jClient) Close(ctx context.Context) error

Close closes the Neo4j Driver and cleans up resources.

Parameters:

  • ctx: The context for managing the lifecycle of the close operation. Returns:

  • An error if the close operation fails.

Ping

func (c *Neo4jClient) Ping(ctx context.Context) error

Ping checks connectivity to the Neo4j database.

Parameters:

  • ctx: The context for managing the lifecycle of the ping operation. Returns:

  • An error if the ping operation fails.

ExecuteWrite

func (c *Neo4jClient) ExecuteWrite(ctx context.Context, database, cypher string, params map[string]interface{}) error

ExecuteWrite executes a single write query on the specified database.

Parameters:

  • ctx: The context for managing the lifecycle of the write operation.

  • database: The name of the database to execute the query on.

  • cypher: The Cypher query to execute.

  • params: The parameters for the Cypher query. Returns:

  • An error if the write operation fails.

ExecuteBatch

func (c *Neo4jClient) ExecuteBatch(ctx context.Context, database string, commands []command.CypherCommand) error

ExecuteBatch executes a batch of Cypher commands sequentially in a single transaction.

Parameters:

  • ctx: The context for managing the lifecycle of the batch operation.

  • database: The name of the database to execute the commands on.

  • commands: A slice of CypherCommand structs representing the commands to execute. Returns:

  • An error if the batch operation fails.

ExecuteRead

func (c *Neo4jClient) ExecuteRead(ctx context.Context, database string, txFunc func(neo4j.ManagedTransaction) error) error

ExecuteRead executes a single read query with a custom transaction function.

Parameters:

  • ctx: The context for managing the lifecycle of the read operation.

  • database: The name of the database to execute the query on.

  • txFunc: A function that defines the read transaction logic. Returns:

  • An error if the read operation fails.

NewGraphWorkerPool

func NewGraphWorkerPool(client *Neo4jClient, workers int, queueSize int, logger *logrus.Logger) *GraphWorkerPool

NewGraphWorkerPool initializes a new GraphWorkerPool

Parameters:

  • Inits the struct based on parameters passed in.

Return:

  • *GraphWorkerPool: The new instance.

Start

func (wp *GraphWorkerPool) Start()

Start launches the worker pool.

Enqueue

func (wp *GraphWorkerPool) Enqueue(job GraphBatchJob)

Enqueue adds a new job to the JobQueue.

Parameters:

  • job: The job to add to the worker queue.

WaitForJobs

func (wp *GraphWorkerPool) WaitForJobs()

WaitForJobs waits until all jobs have been processed.

Stop

func (wp *GraphWorkerPool) Stop()

Stop gracefully shuts down the worker pool.

NewRelationalClient

func NewRelationalClient(dsn string) (*RelationalClient, error)

NewRelationalClient initializes and returns a new RelationalClient.

Parameters:

  • dsn: Data source name.

Returns:

  • *RelationalClient: The new instance.

  • error: Any errors from the function.

Ping

func (c *RelationalClient) Ping() error

Ping checks connectivity to the database.

Close

func (c *RelationalClient) Close() error

Close closes the database connection.

Execute

func (c *RelationalClient) Execute(ctx context.Context, query string, params ...interface{}) error

Execute runs a single SQL command.

Parameters:

  • ctx: The context for the query.

  • query: The actual query string.

  • params: Variadic parameter for query parameters.

Returns:

  • error: Any errors from the method.

Query

func (c *RelationalClient) Query(ctx context.Context, query string, params ...interface{}) ([]map[string]interface{}, error)

Query performs a query and returns results as a slice of maps.

Parameters:

  • ctx: The context for the query.

  • query: The actual query string.

  • params: Variadic parameter for query parameters.

Returns:

  • []map[string]interface{}: A slice of maps that represents the results from the query.

  • error: Any errors from the method.

BeginTx

func (c *RelationalClient) BeginTx(ctx context.Context) (*sql.Tx, error)

BeginTx starts a new database transaction.

ExecuteBatch

func (c *RelationalClient) ExecuteBatch(ctx context.Context, commands []RelationalBatchJob) error

ExecuteBatch executes a batch of SQL commands in a single transaction.

Parameters:

  • ctx: The context for the queries.

  • commands: A slice of RelationalBatchJobs.

Returns:

  • error: Any errors from the method.

NewRelationalWorkerPool

func NewRelationalWorkerPool(db *sql.DB, workers, queueSize int, logger *logrus.Logger) *RelationalWorkerPool

NewRelationalWorkerPool initializes a new RelationalWorkerPool.

Start

func (wp *RelationalWorkerPool) Start()

Start launches the worker pool.

worker

func (wp *RelationalWorkerPool) worker(id int)

worker processes jobs from the JobQueue.

Enqueue

func (wp *RelationalWorkerPool) Enqueue(job RelationalBatchJob)

Enqueue adds a new job to the JobQueue.

WaitForJobs

func (wp *RelationalWorkerPool) WaitForJobs()

WaitForJobs waits until all jobs have been processed.

Stop

func (wp *RelationalWorkerPool) Stop()

Stop gracefully shuts down the worker pool.

Last modified: 28 January 2025