streamsx.hdfs package

HDFS integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

This package exposes SPL operators in the com.ibm.streamsx.hdfs toolkit as Python methods.

Overview

Provides functions to access files on HDFS.

Use this package with the following services on IBM Cloud:

Credentials

“Analytics Engine” credentials are defined using service credentials JSON.

The mandatory JSON elements are “user”, “password” and “webhdfs”:

{
    "user": "<USER>"
    "password": "<PASSWORD>",
    "webhdfs": "https://<HOST>:<PORT>"
}

If you are using HDFS server(s) different to the “Analytics Engine” service, then you can provide the configuration file (hdfs-site.xml or core-site.xml) to configure the connection.

Sample

A simple hello world example of a Streams application writing string messages to a file to HDFS. Scan for created file on HDFS and read the content:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.hdfs as hdfs

credentials = json.load(credentials_analytics_engine_service)

topo = Topology('HDFSHelloWorld')

to_hdfs = topo.source(['Hello', 'World!'])
to_hdfs = to_hdfs.as_string()

# Write a stream to HDFS
hdfs.write(to_hdfs, credentials=credentials, file='/sample/hw.txt')

scanned = hdfs.scan(topo, credentials=credentials, directory='/sample', init_delay=10)

# read text file line by line
r = hdfs.read(scanned, credentials=credentials)

# print each line (tuple)
r.print()

submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
class streamsx.hdfs.HdfsDirectoryScan(credentials, directory, pattern=None, initDelay=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

Watches a HDFS directory, and generates file names on the output, one for each file that is found in the directory.

Example, scanning for files in testDir directory:

import streamsx.hdfs as hdfs
from streamsx.topology.topology import Topology

dir = '/user/streamsadmin/testDir'
config = {
    'initDelay': 2.0,
    'sleepTime' : 2.0,
    'pattern' : 'sample.*txt'
}

s = topo.source(hdfs.DirectoryScan(directory=dir, **config))

Example, scanning for files with “csv” file extension:

s = topo.source(hdfs.HdfsDirectoryScan(directory=dir, pattern='.*\.csv$'))
credentials

The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

dict|str

directory

Specifies the name of the directory to be scanned

Type

str|Expression

pattern

Instructs the operator to ignore file names that do not match the regular expression pattern

Type

str

initDelay

initDelay specifies the time to wait in seconds before the operator reads the first file. The default value is 0 .

Type

int

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property appConfigName

The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..

Type

str

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property configPath

The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .

Type

str

property credFile

webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.

Type

str

Type

The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema

property credentials

The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

str

property directory

This optional parameter specifies the name of the directory to be scanned. If the name starts with a slash, it is considered an absolute directory that you want to scan. If it does not start with a slash, it is considered a relative directory, relative to the /user/userid/ directory. This parameter is mandatory if the input port is not specified.

Type

str

property hdfsPassword

The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.

Type

str

property hdfsUri

The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.

Type

str

property hdfsUser

The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .

Type

str

property initDelay

The parameter initDelay specifies the time to wait in seconds before the operator HDFS2DirectoryScan reads the first file. The default value is 0 .

Type

float

property keyStorePassword

The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.

Type

str

property keyStorePath

The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.

Type

str

property libPath

The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.

Type

str

property pattern

The optional parameter pattern limits the file names that are listed to the names that match the specified regular expression. The HDFS2DirectoryScan operator ignores file names that do not match the specified regular expression.

Type

str

property policyFilePath

The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).

Type

str

populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

property reconnectionBound

The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .

Type

int

property reconnectionInterval

The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .

Type

int

property reconnectionPolicy

NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .

Type

str

Type

The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are

property sleepTime

The parameter sleepTime specifies the minimum time between directory scans. The default value is 5.0 seconds. .

Type

float

property strictMode

The parameter sleepTime specifies the minimum time between directory scans. The default value is 5.0 seconds.

Type

bool

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

class streamsx.hdfs.HdfsFileSink(credentials, file, **options)

Write a stream to a file

Note

Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.

Example for writing a stream to a file:

import streamsx.hdfs as hdfs
from streamsx.topology.topology import Topology

topo = Topology()
s = topo.source(['Hello', 'World!']).as_string()
s.for_each(hdfs.HdfsFileSink(credentials=credentials, file='/user/hdfs/data.txt'))

Example with specifying parameters as kwargs and construct the name of the file with the attribute filename of the input stream:

config = {
    'hdfsUser': 'hdfs',
    'tuplesPerFile': 50000
}
fsink = hdfs.HdfsFileSink(file=streamsx.spl.op.Expression.expression('pytest1/sample4%FILENUM.txt''), **config)
to_file.for_each(fsink)
credentials

The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

dict|str

file

Name of the output file.

Type

str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property appConfigName

The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..

Type

str

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property bytesPerFile

This parameter specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.

Type

int

property closeOnPunct

This parameter specifies whether the operator closes the current output file and creates a new file when a punctuation marker is received. The default value is false .

Type

bool

property configPath

The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .

Type

str

property credFile

webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.

Type

str

Type

The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema

property credentials

The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

str

property encoding

This optional parameter specifies the encoding to use when reading files. The default value is UTF-8 .

Type

str

property file

This parameter specifies the name of the file that the operator writes to. The file parameter can optionally contain the following variables, which the operator evaluates at runtime to generate the file name: %HOST The host that is running the processing element (PE) of this operator. %FILENUM The file number, which starts at 0 and counts up as a new file is created for writing. %PROCID The process ID of the processing element. %PEID The processing element ID. %PELAUNCHNUM The PE launch count. %TIME The time when the file is created. If the timeFormat parameter is not specified, the default time format is yyyyMMdd_HHmmss .

Type

str

property fileAttributeName

If set, this points to an attribute containing the filename. The operator will close a file when value of this attribute changes. If the string contains substitutions, the check for a change happens before substituations, and the filename contains the substitutions based on the first tuple.

Type

str

property hdfsPassword

The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.

Type

str

property hdfsUri

The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.

Type

str

property hdfsUser

The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .

Type

str

property keyStorePassword

The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.

Type

str

property keyStorePath

The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.

Type

str

property libPath

The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.

Type

str

property policyFilePath

The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).

Type

str

populate(topology, stream, name, **options) → streamsx.topology.topology.Sink

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property reconnectionBound

The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .

Type

int

property reconnectionInterval

The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .

Type

int

property reconnectionPolicy

NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .

Type

str

Type

The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are

property tempFile

This parameter specifies the name of the file that the operator writes to. When the file is closed the file is renamed to the final filename defined by the file parameter or fileAttributeName parameter.

Type

str

property timeFormat

This parameter specifies the time format to use when the file parameter value contains %TIME . The parameter value must contain conversion specifications that are supported by the java.text.SimpleDateFormat. The default format is yyyyMMdd_HHmmss .

Type

str

property timePerFile

This parameter specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters.

Type

float

property tuplesPerFile

This parameter specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.

Type

int

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

class streamsx.hdfs.HdfsFileSource(credentials, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

Reads HDFS files given by input stream and generates tuples with the file content on the output stream.

Example, scanning for HDFS files in pytest directory and reading files via HdfsFileSource:

import streamsx.hdfs as hdfs
dir_scan_output_schema = StreamSchema('tuple<rstring fileName>')


dirScannParameters = {
    'initDelay': 2.0,
    'sleepTime' : 2.0,
    'pattern' : 'sample.*txt'
}

# HdfsDirectoryScan scans directory 'pytest' and delivers HDFS file names in output port.
scannedFileNames = topo.source(hdfs.HdfsDirectoryScan(credentials=hdfs_cfg_file, directory='pytest', schema=dir_scan_output_schema, **dirScannParameters))
scannedFileNames.print()


sourceParamaters = {
    'initDelay': 1.0
}

source_schema = StreamSchema('tuple<rstring line>')

# HdfsFileSource reads HDFS files in directory 'pytest' and returns the lines of files in output port
readLines = scannedFileNames.map(hdfs.HdfsFileSource(credentials=hdfs_cfg_file, schema=source_schema, **sourceParamaters))
credentials

The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

dict|str

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property appConfigName

The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..

Type

str

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property blockSize

The optional parameter blockSize specifies the maximum number of bytes to be read at one time when reading a file into binary mode (ie, into a blob); thus, it is the maximum size of the blobs on the output stream. The parameter is optional, and defaults to 4096 .

Type

int

property configPath

The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .

Type

str

property credFile

webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.

Type

str

Type

The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema

property credentials

The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

str

property encoding

This optional parameter specifies the encoding to use when reading files. The default value is UTF-8 .

Type

str

property file

This parameter specifies the name of the file that the operator opens and reads. This parameter must be specified when the optional input port is not configured. If the optional input port is used and the file name is specified, the operator generates an error.

Type

str

property hdfsPassword

The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.

Type

str

property hdfsUri

The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.

Type

str

property hdfsUser

The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .

Type

str

property initDelay

The parameter initDelay specifies the time to wait in seconds before the operator HDFS2FileSource reads the first file. The default value is 0 .

Type

float

property keyStorePassword

The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.

Type

str

property keyStorePath

The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.

Type

str

property libPath

The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.

Type

str

property policyFilePath

The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).

Type

str

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property reconnectionBound

The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .

Type

int

property reconnectionInterval

The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .

Type

int

property reconnectionPolicy

NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .

Type

str

Type

The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

class streamsx.hdfs.HdfsFileCopy(credentials, direction, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)

Reads HDFS files given by input stream and generates tuples with the file content on the output stream.

Example, scanning for HDFS files in pytest directory and reading files via HdfsFileSource:

import streamsx.hdfs as hdfs

dir_scan_output_schema = StreamSchema('tuple<rstring hdfsFileName>')

dirScannParameters = {
    'initDelay': 2.0,
    'sleepTime' : 2.0,
    'pattern' : 'sample.*txt'
}
# HdfsDirectoryScan scans directory 'pytest' and delivers HDFS file names in output port.
scannedFileNames = topo.source(hdfs.HdfsDirectoryScan(credentials=credentials, directory='pytest', schema=dir_scan_output_schema, **dirScannParameters))

scannedFileNames.print()


fileCopyParamaters = {
    'hdfsFileAttrName': 'hdfsFileName',
    'localFile' : '/tmp/'
}

output_schema = StreamSchema('tuple<rstring result, uint64 numResults>')

# HdfsFileCopy copies HDFS files from directory 'pytest' into local directory /tmp
copyFiles = scannedFileNames.map(hdfs.HdfsFileCopy(credentials=hdfs_cfg_file, direction='copyToLocalFile', schema=output_schema, **fileCopyParamaters))
credentials

The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

dict|str

direction

This parameter specifies the direction of copy. The parameter can be set with the following values. ‘copyFromLocalFile’ : Copy a file from local disk to the HDFS file system. ‘copyToLocalFile’ : Copy a file from HDFS file system to the local disk.

Type

str

schema

Output schema, defaults to CommonSchema.String

Type

StreamSchema

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property appConfigName

The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..

Type

str

property authKeytab

The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Type

str

property authPrincipal

The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Type

str

property blockSize

The optional parameter blockSize specifies the maximum number of bytes to be read at one time when reading a file into binary mode (ie, into a blob); thus, it is the maximum size of the blobs on the output stream. The parameter is optional, and defaults to 4096 .

Type

int

property configPath

The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .

Type

str

property credFile

webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.

Type

str

Type

The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema

property credentials

The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .

Type

str

property deleteSourceFile

The optional parameter deleteSourceFile specifies whether to delete the source file when processing is finished.

Type

bool

property direction

This parameter specifies the direction of copy. The parameter can be set with the following values. copyFromLocalFile Copy a file from local disk to the HDFS file system. copyToLocalFile Copy a file from HDFS file system to the local disk.

Type

str

property hdfsFile

This optional parameter specifies the name of HDFS file or directory. If the name starts with a slash, it is considered an absolute path of HDFS file that you want to use. If it does not start with a slash, it is considered a relative path, relative to the /user/userid/hdfsFile . If you want to copy all incoming files from input port to a directory set the value of direction to copyFromLocalFile and set the value of this parameter to a directory with a slash at the end e.g. /user/userid/testDir/ . This parameter is mandatory if the hdfsFileAttrNmae is not specified in input port. The parameter hdfsFile cannot be set when parameter hdfsFileAttrName is set.

Type

str

property hdfsFileAttrName

This optional parameter specifies the value of hdfsFile that coming through input stream. If the name starts with a slash, it is considered an absolute path of HDFS file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to the /user/userid/hdfsFile . This parameter is mandatory if the hdfsFile is not specified. The parameter hdfsFileAttrName cannot be set when parameter hdfsFile is set.

Type

str

property hdfsPassword

The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.

Type

str

property hdfsUri

The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.

Type

str

property hdfsUser

The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .

Type

str

property keyStorePassword

The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.

Type

str

property keyStorePath

The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.

Type

str

property libPath

The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.

Type

str

property localFile

The optional parameter localFile specifies the name of local file to be copied. If the name starts with a slash, it is considered an absolute path of local file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to your project data directory. If you want to copy all incoming files from input port to a directory set the value of direction to copyToLocalFile and set the value of this parameter to a directory with a slash at the end e.g. data/testDir/ . This parameter is mandatory if the localFileAttrNmae is not specified in input port. The parameter localFile cannot be set when parameter localFileAttrName is set.

Type

str

property localFileAttrName

The optional parameter localFileAttrName specifies the value of localFile that coming through input stream. If the name starts with a slash, it is considered an absolute path of local file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to your project data directory. This parameter is mandatory if the localFile is not specified. The parameter localFileAttrName cannot be set when parameter localFile is set.

Type

str

property overwriteDestinationFile

The optional parameter overwriteDestinationFile whether to overwrite the destination file.

Type

bool

property policyFilePath

The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).

Type

str

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property reconnectionBound

The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .

Type

int

property reconnectionInterval

The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .

Type

int

property reconnectionPolicy

NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .

Type

str

Type

The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are

property vmArg

The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.

Type

str

streamsx.hdfs.download_toolkit(url=None, target_dir=None)

Downloads the latest HDFS toolkit from GitHub.

Example for updating the HDFS toolkit for your topology with the latest toolkit from GitHub:

import streamsx.hdfs as hdfs
# download HDFS toolkit from GitHub
hdfs_toolkit_location = hdfs.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)

Example for updating the topology with a specific version of the HDFS toolkit using a URL:

import streamsx.hdfs as hdfs
url500 = 'https://github.com/IBMStreams/streamsx.hdfs/releases/download/v5.0.0/streamx.hdfs.toolkits-5.0.0-20190902-1637.tgz'
hdfs_toolkit_location = hdfs.download_toolkit(url=url500)
streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to objectstorage download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded HDFS toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.1.

streamsx.hdfs.configure_connection(instance, name='hdfs', credentials=None)

Configures IBM Streams for a certain connection.

Creates or updates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

from icpd_core import icpd_util
from streamsx.rest_primitives import Instance
import streamsx.hdfs as hdfs

cfg = icpd_util.get_service_instance_details (name='your-streams-instance')
cfg[context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service (cfg)
app_cfg = hdfs.configure_connection (instance, credentials = 'my_credentials_json')
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration, default name is ‘hdfs’.

  • credentials (str|dict) – The service credentials, for example Analytics Engine service credentials.

Returns

Name of the application configuration.

streamsx.hdfs.scan(topology, credentials, directory, pattern=None, init_delay=None, name=None)

Scans a Hadoop Distributed File System directory for new or modified files.

Repeatedly scans a HDFS directory and writes the names of new or modified files that are found in the directory to the output stream.

Parameters
  • topology (Topology) – Topology to contain the returned stream.

  • credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (hdfs-site.xml or core-site.xml). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.

  • directory (str) – The directory to be scanned. Relative path is relative to the ‘/user/userid/’ directory.

  • pattern (str) – Limits the file names that are listed to the names that match the specified regular expression.

  • init_delay (int|float|datetime.timedelta) – The time to wait in seconds before the operator scans the directory for the first time. If not set, then the default value is 0.

  • schema (Schema) – Optional output stream schema. Default is CommonSchema.String. Alternative a structured streams schema with a single attribute of type rstring is supported.

  • name (str) – Source name in the Streams context, defaults to a generated name.

Returns

Output Stream containing file names with schema DirectoryScanSchema.

streamsx.hdfs.read(stream, credentials, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, name=None)

Reads files from a Hadoop Distributed File System.

Filenames of file to be read are part of the input stream.

Parameters
  • stream (Stream) – Stream of tuples containing file names to be read. Supports CommonSchema.String as input. Alternative a structured streams schema with a single attribute of type rstring is supported.

  • credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (hdfs-site.xml or core-site.xml). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.

  • schema (Schema) – Output schema for the file content, defaults to CommonSchema.String. Alternative a structured streams schema with a single attribute of type rstring or blob is supported.

  • name (str) – Name of the operator in the Streams context, defaults to a generated name.

Returns

Output Stream for file content. Default output schema is CommonSchema.String (line per file).

streamsx.hdfs.write(stream, credentials, file=None, fileAttributeName=None, schema=None, timePerFile=None, tuplesPerFile=None, bytesPerFile=None, name=None)

Writes files to a Hadoop Distributed File System.

When writing to a file, that exists already on HDFS with the same name, then this file is overwritten. Per default the file is closed when window punctuation mark is received. Different close modes can be specified with the parameters: timePerFile, tuplesPerFile, bytesPerFile

Example with input stream of type CommonSchema.String:

import streamsx.hdfs as hdfs

s = topo.source(['Hello World!']).as_string()
result = hdfs.write(s, credentials=credentials, file='sample%FILENUM.txt')
result.print()
Parameters
  • stream (Stream) – Stream of tuples containing the data to be written to files. Supports CommonSchema.String as input. Alternative a structured streams schema with a single attribute of type rstring or blob is supported.

  • credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (hdfs-site.xml or core-site.xml). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.

  • file (str) –

    Specifies the name of the file. The file parameter can optionally contain the following variables, which are evaluated at runtime to generate the file name:

    • %FILENUM The file number, which starts at 0 and counts up as a new file is created for writing.

    • %TIME The time when the file is created. The time format is yyyyMMdd_HHmmss.

    Important: If the %FILENUM or %TIME specification is not included, the file is overwritten every time a new file is created.

  • timePerFile (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile and tuplesPerFile parameters are mutually exclusive.

  • tuplesPerFile (int) – The maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile and tuplesPerFile parameters are mutually exclusive.

  • bytesPerFile (int) – Approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile and tuplesPerFile parameters are mutually exclusive.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Output Stream with schema FileInfoSchema.

Indices and tables