streamsx.hdfs package¶
HDFS integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
This package exposes SPL operators in the com.ibm.streamsx.hdfs toolkit as Python methods.
Overview¶
Provides functions to access files on HDFS.
Use this package with the following services on IBM Cloud:
Credentials¶
“Analytics Engine” credentials are defined using service credentials JSON.
The mandatory JSON elements are “user”, “password” and “webhdfs”:
{
"user": "<USER>"
"password": "<PASSWORD>",
"webhdfs": "https://<HOST>:<PORT>"
}
If you are using HDFS server(s) different to the “Analytics Engine” service,
then you can provide the configuration file (hdfs-site.xml
or core-site.xml
) to configure the connection.
Sample¶
A simple hello world example of a Streams application writing string messages to a file to HDFS. Scan for created file on HDFS and read the content:
from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.hdfs as hdfs
credentials = json.load(credentials_analytics_engine_service)
topo = Topology('HDFSHelloWorld')
to_hdfs = topo.source(['Hello', 'World!'])
to_hdfs = to_hdfs.as_string()
# Write a stream to HDFS
hdfs.write(to_hdfs, credentials=credentials, file='/sample/hw.txt')
scanned = hdfs.scan(topo, credentials=credentials, directory='/sample', init_delay=10)
# read text file line by line
r = hdfs.read(scanned, credentials=credentials)
# print each line (tuple)
r.print()
submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
-
class
streamsx.hdfs.
HdfsDirectoryScan
(credentials, directory, pattern=None, initDelay=None, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ Watches a HDFS directory, and generates file names on the output, one for each file that is found in the directory.
Example, scanning for files in testDir directory:
import streamsx.hdfs as hdfs from streamsx.topology.topology import Topology dir = '/user/streamsadmin/testDir' config = { 'initDelay': 2.0, 'sleepTime' : 2.0, 'pattern' : 'sample.*txt' } s = topo.source(hdfs.DirectoryScan(directory=dir, **config))
Example, scanning for files with “csv” file extension:
s = topo.source(hdfs.HdfsDirectoryScan(directory=dir, pattern='.*\.csv$'))
-
credentials
¶ The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
dict|str
-
directory
¶ Specifies the name of the directory to be scanned
- Type
str|Expression
-
pattern
¶ Instructs the operator to ignore file names that do not match the regular expression pattern
- Type
str
-
initDelay
¶ initDelay specifies the time to wait in seconds before the operator reads the first file. The default value is 0 .
- Type
int
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
appConfigName
¶ The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..
- Type
str
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
configPath
¶ The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .
- Type
str
-
property
credFile
¶ webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.
- Type
str
- Type
The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema
-
property
credentials
The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
str
-
property
directory
This optional parameter specifies the name of the directory to be scanned. If the name starts with a slash, it is considered an absolute directory that you want to scan. If it does not start with a slash, it is considered a relative directory, relative to the /user/userid/ directory. This parameter is mandatory if the input port is not specified.
- Type
str
-
property
hdfsPassword
¶ The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.
- Type
str
-
property
hdfsUri
¶ The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.
- Type
str
-
property
hdfsUser
¶ The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .
- Type
str
-
property
initDelay
The parameter initDelay specifies the time to wait in seconds before the operator HDFS2DirectoryScan reads the first file. The default value is 0 .
- Type
float
-
property
keyStorePassword
¶ The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.
- Type
str
-
property
keyStorePath
¶ The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.
- Type
str
-
property
libPath
¶ The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.
- Type
str
-
property
pattern
The optional parameter pattern limits the file names that are listed to the names that match the specified regular expression. The HDFS2DirectoryScan operator ignores file names that do not match the specified regular expression.
- Type
str
-
property
policyFilePath
¶ The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).
- Type
str
-
populate
(topology, name, **options)¶ Populate the topology with this composite source.
- Parameters
topology – Topology containing the source.
name – Name passed into
source
.**options – Future options passed to
source
.
- Returns
Single stream representing the source.
- Return type
Stream
-
property
reconnectionBound
¶ The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .
- Type
int
-
property
reconnectionInterval
¶ The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .
- Type
int
-
property
reconnectionPolicy
¶ NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .
- Type
str
- Type
The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are
-
property
sleepTime
¶ The parameter sleepTime specifies the minimum time between directory scans. The default value is 5.0 seconds. .
- Type
float
-
property
strictMode
¶ The parameter sleepTime specifies the minimum time between directory scans. The default value is 5.0 seconds.
- Type
bool
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
class
streamsx.hdfs.
HdfsFileSink
(credentials, file, **options)¶ Write a stream to a file
Note
Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist.
Example for writing a stream to a file:
import streamsx.hdfs as hdfs from streamsx.topology.topology import Topology topo = Topology() s = topo.source(['Hello', 'World!']).as_string() s.for_each(hdfs.HdfsFileSink(credentials=credentials, file='/user/hdfs/data.txt'))
Example with specifying parameters as kwargs and construct the name of the file with the attribute
filename
of the input stream:config = { 'hdfsUser': 'hdfs', 'tuplesPerFile': 50000 } fsink = hdfs.HdfsFileSink(file=streamsx.spl.op.Expression.expression('pytest1/sample4%FILENUM.txt''), **config) to_file.for_each(fsink)
-
credentials
¶ The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
dict|str
-
file
¶ Name of the output file.
- Type
str
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
appConfigName
¶ The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..
- Type
str
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
bytesPerFile
¶ This parameter specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.
- Type
int
-
property
closeOnPunct
¶ This parameter specifies whether the operator closes the current output file and creates a new file when a punctuation marker is received. The default value is false .
- Type
bool
-
property
configPath
¶ The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .
- Type
str
-
property
credFile
¶ webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.
- Type
str
- Type
The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema
-
property
credentials
The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
str
-
property
encoding
¶ This optional parameter specifies the encoding to use when reading files. The default value is UTF-8 .
- Type
str
-
property
file
This parameter specifies the name of the file that the operator writes to. The file parameter can optionally contain the following variables, which the operator evaluates at runtime to generate the file name: %HOST The host that is running the processing element (PE) of this operator. %FILENUM The file number, which starts at 0 and counts up as a new file is created for writing. %PROCID The process ID of the processing element. %PEID The processing element ID. %PELAUNCHNUM The PE launch count. %TIME The time when the file is created. If the timeFormat parameter is not specified, the default time format is yyyyMMdd_HHmmss .
- Type
str
-
property
fileAttributeName
¶ If set, this points to an attribute containing the filename. The operator will close a file when value of this attribute changes. If the string contains substitutions, the check for a change happens before substituations, and the filename contains the substitutions based on the first tuple.
- Type
str
-
property
hdfsPassword
¶ The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.
- Type
str
-
property
hdfsUri
¶ The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.
- Type
str
-
property
hdfsUser
¶ The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .
- Type
str
-
property
keyStorePassword
¶ The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.
- Type
str
-
property
keyStorePath
¶ The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.
- Type
str
-
property
libPath
¶ The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.
- Type
str
-
property
policyFilePath
¶ The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).
- Type
str
-
populate
(topology, stream, name, **options) → streamsx.topology.topology.Sink¶ Populate the topology with this composite for each transformation.
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
name – Name passed into
for_each
.**options – Future options passed to
for_each
.
- Returns
Termination for this composite transformation of stream.
- Return type
Sink
-
property
reconnectionBound
¶ The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .
- Type
int
-
property
reconnectionInterval
¶ The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .
- Type
int
-
property
reconnectionPolicy
¶ NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .
- Type
str
- Type
The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are
-
property
tempFile
¶ This parameter specifies the name of the file that the operator writes to. When the file is closed the file is renamed to the final filename defined by the file parameter or fileAttributeName parameter.
- Type
str
-
property
timeFormat
¶ This parameter specifies the time format to use when the file parameter value contains %TIME . The parameter value must contain conversion specifications that are supported by the java.text.SimpleDateFormat. The default format is yyyyMMdd_HHmmss .
- Type
str
-
property
timePerFile
¶ This parameter specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters.
- Type
float
-
property
tuplesPerFile
¶ This parameter specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. The bytesPerFile, timePerFile, and tuplesPerFile parameters are mutually exclusive; you can specify only one of these parameters at a time.
- Type
int
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
class
streamsx.hdfs.
HdfsFileSource
(credentials, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ Reads HDFS files given by input stream and generates tuples with the file content on the output stream.
Example, scanning for HDFS files in pytest directory and reading files via HdfsFileSource:
import streamsx.hdfs as hdfs dir_scan_output_schema = StreamSchema('tuple<rstring fileName>') dirScannParameters = { 'initDelay': 2.0, 'sleepTime' : 2.0, 'pattern' : 'sample.*txt' } # HdfsDirectoryScan scans directory 'pytest' and delivers HDFS file names in output port. scannedFileNames = topo.source(hdfs.HdfsDirectoryScan(credentials=hdfs_cfg_file, directory='pytest', schema=dir_scan_output_schema, **dirScannParameters)) scannedFileNames.print() sourceParamaters = { 'initDelay': 1.0 } source_schema = StreamSchema('tuple<rstring line>') # HdfsFileSource reads HDFS files in directory 'pytest' and returns the lines of files in output port readLines = scannedFileNames.map(hdfs.HdfsFileSource(credentials=hdfs_cfg_file, schema=source_schema, **sourceParamaters))
-
credentials
¶ The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
dict|str
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
appConfigName
¶ The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..
- Type
str
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
blockSize
¶ The optional parameter blockSize specifies the maximum number of bytes to be read at one time when reading a file into binary mode (ie, into a blob); thus, it is the maximum size of the blobs on the output stream. The parameter is optional, and defaults to 4096 .
- Type
int
-
property
configPath
¶ The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .
- Type
str
-
property
credFile
¶ webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.
- Type
str
- Type
The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema
-
property
credentials
The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
str
-
property
encoding
¶ This optional parameter specifies the encoding to use when reading files. The default value is UTF-8 .
- Type
str
-
property
file
¶ This parameter specifies the name of the file that the operator opens and reads. This parameter must be specified when the optional input port is not configured. If the optional input port is used and the file name is specified, the operator generates an error.
- Type
str
-
property
hdfsPassword
¶ The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.
- Type
str
-
property
hdfsUri
¶ The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.
- Type
str
-
property
hdfsUser
¶ The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .
- Type
str
-
property
initDelay
¶ The parameter initDelay specifies the time to wait in seconds before the operator HDFS2FileSource reads the first file. The default value is 0 .
- Type
float
-
property
keyStorePassword
¶ The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.
- Type
str
-
property
keyStorePath
¶ The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.
- Type
str
-
property
libPath
¶ The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.
- Type
str
-
property
policyFilePath
¶ The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).
- Type
str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation.
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
reconnectionBound
¶ The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .
- Type
int
-
property
reconnectionInterval
¶ The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .
- Type
int
-
property
reconnectionPolicy
¶ NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .
- Type
str
- Type
The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
class
streamsx.hdfs.
HdfsFileCopy
(credentials, direction, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, **options)¶ Reads HDFS files given by input stream and generates tuples with the file content on the output stream.
Example, scanning for HDFS files in pytest directory and reading files via HdfsFileSource:
import streamsx.hdfs as hdfs dir_scan_output_schema = StreamSchema('tuple<rstring hdfsFileName>') dirScannParameters = { 'initDelay': 2.0, 'sleepTime' : 2.0, 'pattern' : 'sample.*txt' } # HdfsDirectoryScan scans directory 'pytest' and delivers HDFS file names in output port. scannedFileNames = topo.source(hdfs.HdfsDirectoryScan(credentials=credentials, directory='pytest', schema=dir_scan_output_schema, **dirScannParameters)) scannedFileNames.print() fileCopyParamaters = { 'hdfsFileAttrName': 'hdfsFileName', 'localFile' : '/tmp/' } output_schema = StreamSchema('tuple<rstring result, uint64 numResults>') # HdfsFileCopy copies HDFS files from directory 'pytest' into local directory /tmp copyFiles = scannedFileNames.map(hdfs.HdfsFileCopy(credentials=hdfs_cfg_file, direction='copyToLocalFile', schema=output_schema, **fileCopyParamaters))
-
credentials
¶ The credentials of Hadoop cluster as dict or JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
dict|str
-
direction
¶ This parameter specifies the direction of copy. The parameter can be set with the following values. ‘copyFromLocalFile’ : Copy a file from local disk to the HDFS file system. ‘copyToLocalFile’ : Copy a file from HDFS file system to the local disk.
- Type
str
-
schema
¶ Output schema, defaults to CommonSchema.String
- Type
StreamSchema
-
options
¶ The additional optional parameters as variable keyword arguments.
- Type
kwargs
-
property
appConfigName
¶ The optional parameter appConfigName specifies the name of the application configuration that contains HDFS connection related configuration parameter credentials..
- Type
str
-
property
authKeytab
¶ The optional parameter authKeytab specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.
- Type
str
-
property
authPrincipal
¶ The optional parameter authPrincipal specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.
- Type
str
-
property
blockSize
¶ The optional parameter blockSize specifies the maximum number of bytes to be read at one time when reading a file into binary mode (ie, into a blob); thus, it is the maximum size of the blobs on the output stream. The parameter is optional, and defaults to 4096 .
- Type
int
-
property
configPath
¶ The optional parameter configPath specifies the path to the directory that contains the HDFS configuration file core-site.xml .
- Type
str
-
property
credFile
¶ webhdfs://hdfshost:webhdfsport The credentials file must be a valid JSON string and must contain the hdfs credentials key/value pairs for user, password and webhdfs in JSON format.
- Type
str
- Type
The optional parameter credFile specifies a file that contains login credentials. The credentials are used to connect to WEBHDF remotely by using the schema
-
property
credentials
The optional parameter credentials specifies the JSON string that contains the hdfs credentials key/value pairs for user, password and webhdfs .
- Type
str
-
property
deleteSourceFile
¶ The optional parameter deleteSourceFile specifies whether to delete the source file when processing is finished.
- Type
bool
-
property
direction
This parameter specifies the direction of copy. The parameter can be set with the following values. copyFromLocalFile Copy a file from local disk to the HDFS file system. copyToLocalFile Copy a file from HDFS file system to the local disk.
- Type
str
-
property
hdfsFile
¶ This optional parameter specifies the name of HDFS file or directory. If the name starts with a slash, it is considered an absolute path of HDFS file that you want to use. If it does not start with a slash, it is considered a relative path, relative to the /user/userid/hdfsFile . If you want to copy all incoming files from input port to a directory set the value of direction to copyFromLocalFile and set the value of this parameter to a directory with a slash at the end e.g. /user/userid/testDir/ . This parameter is mandatory if the hdfsFileAttrNmae is not specified in input port. The parameter hdfsFile cannot be set when parameter hdfsFileAttrName is set.
- Type
str
-
property
hdfsFileAttrName
¶ This optional parameter specifies the value of hdfsFile that coming through input stream. If the name starts with a slash, it is considered an absolute path of HDFS file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to the /user/userid/hdfsFile . This parameter is mandatory if the hdfsFile is not specified. The parameter hdfsFileAttrName cannot be set when parameter hdfsFile is set.
- Type
str
-
property
hdfsPassword
¶ The parameter hdfsPassword specifies the password to use when you connecting to a Hadoop instance via WEBHDFS.
- Type
str
-
property
hdfsUri
¶ The parameter hdfsUri specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system.
- Type
str
-
property
hdfsUser
¶ The parameter hdfsUser specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. .
- Type
str
-
property
keyStorePassword
¶ The optional parameter keyStorePassword is only supported when connecting to a WEBHDFS. It specifies the password for the keystore file.
- Type
str
-
property
keyStorePath
¶ The optional parameter keyStorePath is only supported when connecting to a WEBHDFS. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to.
- Type
str
-
property
libPath
¶ The optional parameter libPath specifies the absolute path to the directory that contains the Hadoop library files.
- Type
str
-
property
localFile
¶ The optional parameter localFile specifies the name of local file to be copied. If the name starts with a slash, it is considered an absolute path of local file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to your project data directory. If you want to copy all incoming files from input port to a directory set the value of direction to copyToLocalFile and set the value of this parameter to a directory with a slash at the end e.g. data/testDir/ . This parameter is mandatory if the localFileAttrNmae is not specified in input port. The parameter localFile cannot be set when parameter localFileAttrName is set.
- Type
str
-
property
localFileAttrName
¶ The optional parameter localFileAttrName specifies the value of localFile that coming through input stream. If the name starts with a slash, it is considered an absolute path of local file that you want to copy. If it does not start with a slash, it is considered a relative path, relative to your project data directory. This parameter is mandatory if the localFile is not specified. The parameter localFileAttrName cannot be set when parameter localFile is set.
- Type
str
-
property
overwriteDestinationFile
¶ The optional parameter overwriteDestinationFile whether to overwrite the destination file.
- Type
bool
-
property
policyFilePath
¶ The optional parameter policyFilePath is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar).
- Type
str
-
populate
(topology, stream, schema, name, **options)¶ Populate the topology with this composite map transformation.
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
schema – Schema passed into
map
.name – Name passed into
map
.**options – Future options passed to
map
.
- Returns
Single stream representing the transformation of stream.
- Return type
Stream
-
property
reconnectionBound
¶ The optional parameter reconnectionBound specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5 .
- Type
int
-
property
reconnectionInterval
¶ The optional parameter reconnectionInterval specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10 .
- Type
int
-
property
reconnectionPolicy
¶ NoRetry, InfiniteRetry , and BoundedRetry . The default value is BoundedRetry .
- Type
str
- Type
The optional parameter reconnectionPolicy specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are
-
property
vmArg
¶ The optional parameter vmArg parameter to specify additional JVM arguments that are required by the specific invocation of the operator.
- Type
str
-
-
streamsx.hdfs.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest HDFS toolkit from GitHub.
Example for updating the HDFS toolkit for your topology with the latest toolkit from GitHub:
import streamsx.hdfs as hdfs # download HDFS toolkit from GitHub hdfs_toolkit_location = hdfs.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
Example for updating the topology with a specific version of the HDFS toolkit using a URL:
import streamsx.hdfs as hdfs url500 = 'https://github.com/IBMStreams/streamsx.hdfs/releases/download/v5.0.0/streamx.hdfs.toolkits-5.0.0-20190902-1637.tgz' hdfs_toolkit_location = hdfs.download_toolkit(url=url500) streamsx.spl.toolkit.add_toolkit(topology, hdfs_toolkit_location)
- Parameters
url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to objectstorage download a specific version of the toolkit.
target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is
None
a location relative to the system temporary directory is chosen.
- Returns
the location of the downloaded HDFS toolkit
- Return type
str
Note
This function requires an outgoing Internet connection
New in version 1.1.
-
streamsx.hdfs.
configure_connection
(instance, name='hdfs', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates or updates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
from icpd_core import icpd_util from streamsx.rest_primitives import Instance import streamsx.hdfs as hdfs cfg = icpd_util.get_service_instance_details (name='your-streams-instance') cfg[context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service (cfg) app_cfg = hdfs.configure_connection (instance, credentials = 'my_credentials_json')
- Parameters
instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
name (str) – Name of the application configuration, default name is ‘hdfs’.
credentials (str|dict) – The service credentials, for example Analytics Engine service credentials.
- Returns
Name of the application configuration.
-
streamsx.hdfs.
scan
(topology, credentials, directory, pattern=None, init_delay=None, name=None)¶ Scans a Hadoop Distributed File System directory for new or modified files.
Repeatedly scans a HDFS directory and writes the names of new or modified files that are found in the directory to the output stream.
- Parameters
topology (Topology) – Topology to contain the returned stream.
credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (
hdfs-site.xml
orcore-site.xml
). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.directory (str) – The directory to be scanned. Relative path is relative to the ‘/user/userid/’ directory.
pattern (str) – Limits the file names that are listed to the names that match the specified regular expression.
init_delay (int|float|datetime.timedelta) – The time to wait in seconds before the operator scans the directory for the first time. If not set, then the default value is 0.
schema (Schema) – Optional output stream schema. Default is
CommonSchema.String
. Alternative a structured streams schema with a single attribute of typerstring
is supported.name (str) – Source name in the Streams context, defaults to a generated name.
- Returns
Output Stream containing file names with schema
DirectoryScanSchema
.
-
streamsx.hdfs.
read
(stream, credentials, schema=<CommonSchema.String: <streamsx.topology.schema.StreamSchema object>>, name=None)¶ Reads files from a Hadoop Distributed File System.
Filenames of file to be read are part of the input stream.
- Parameters
stream (Stream) – Stream of tuples containing file names to be read. Supports
CommonSchema.String
as input. Alternative a structured streams schema with a single attribute of typerstring
is supported.credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (
hdfs-site.xml
orcore-site.xml
). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.schema (Schema) – Output schema for the file content, defaults to
CommonSchema.String
. Alternative a structured streams schema with a single attribute of typerstring
orblob
is supported.name (str) – Name of the operator in the Streams context, defaults to a generated name.
- Returns
Output Stream for file content. Default output schema is
CommonSchema.String
(line per file).
-
streamsx.hdfs.
write
(stream, credentials, file=None, fileAttributeName=None, schema=None, timePerFile=None, tuplesPerFile=None, bytesPerFile=None, name=None)¶ Writes files to a Hadoop Distributed File System.
When writing to a file, that exists already on HDFS with the same name, then this file is overwritten. Per default the file is closed when window punctuation mark is received. Different close modes can be specified with the parameters:
timePerFile
,tuplesPerFile
,bytesPerFile
Example with input stream of type
CommonSchema.String
:import streamsx.hdfs as hdfs s = topo.source(['Hello World!']).as_string() result = hdfs.write(s, credentials=credentials, file='sample%FILENUM.txt') result.print()
- Parameters
stream (Stream) – Stream of tuples containing the data to be written to files. Supports
CommonSchema.String
as input. Alternative a structured streams schema with a single attribute of typerstring
orblob
is supported.credentials (dict|str|file) – The credentials of the IBM cloud Analytics Engine service in JSON (idct) or JSON string (str) or the path to the configuration file (
hdfs-site.xml
orcore-site.xml
). If the configuration file is specified, then this file will be copied to the ‘etc’ directory of the application bundle.file (str) –
Specifies the name of the file. The file parameter can optionally contain the following variables, which are evaluated at runtime to generate the file name:
%FILENUM The file number, which starts at 0 and counts up as a new file is created for writing.
%TIME The time when the file is created. The time format is yyyyMMdd_HHmmss.
Important: If the %FILENUM or %TIME specification is not included, the file is overwritten every time a new file is created.
timePerFile (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened for writing. The
bytesPerFile
,timePerFile
andtuplesPerFile
parameters are mutually exclusive.tuplesPerFile (int) – The maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. The
bytesPerFile
,timePerFile
andtuplesPerFile
parameters are mutually exclusive.bytesPerFile (int) – Approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened for writing. The
bytesPerFile
,timePerFile
andtuplesPerFile
parameters are mutually exclusive.name (str) – Sink name in the Streams context, defaults to a generated name.
- Returns
Output Stream with schema
FileInfoSchema
.