API Reference

Details about the libraries made available to tests through use of the StreamSets Test Framework are documented below. These APIs can be divided into those related to StreamSets Data Collector, StreamSets Control Hub, Environments, and Utility functions.

StreamSets Data Collector

Main interface

Abstractions for interacting with StreamSets Data Collector.

This module enables deployment of Docker-based instances of SDC as well as convenience methods for interacting with Data Collector deployments.

streamsets.testframework.sdc.DEFAULT_ENABLE_KERBEROS = False
streamsets.testframework.sdc.DEFAULT_NETWORK = 'cluster'
streamsets.testframework.sdc.DEFAULT_SDC_ALWAYS_PULL = False
streamsets.testframework.sdc.DEFAULT_SDC_PASSWORD = 'admin'
streamsets.testframework.sdc.DEFAULT_SDC_TEAR_DOWN_ON_EXIT = True
streamsets.testframework.sdc.DEFAULT_SDC_USERNAME = 'admin'
streamsets.testframework.sdc.DEFAULT_SDC_VERSION = '3.3.0'
streamsets.testframework.sdc.DEFAULT_START_STATUSES_TO_WAIT_FOR = ['RUNNING', 'FINISHED']
class streamsets.testframework.sdc.DataCollector(version=None, username='admin', password='admin', log_level=None, server_url=None, control_hub=None, tear_down_on_exit=None, always_pull=None, enable_kerberos=None, https=None, user_libs=None, network='cluster')[source]

Class to interact with StreamSets Data Collector.

The discrepancy between the default values of the initializer in the auto-generated method signature and the values below is owed to logic used to convert the arguments into instance attributes.

Parameters:
  • version (str) – SDC version.
  • username (str) – SDC username. This is used when interacting with the SDC REST API, and the user must already exist. Default: DEFAULT_SDC_USERNAME
  • password (str) – SDC password. Default: DEFAULT_SDC_PASSWORD
  • server_url (str) – URL of an existing SDC deployment with which to interact. If not set, the Test Framework will manage a Docker-based Data Collector instance. Default: None
  • log_level (str) – SDC log level.
  • tear_down_on_exit (bool) – whether to invoke the instance’s tear_down method in the context manager __exit__. Can also be used outside of context managers in checks for whether to keep Docker-based SDC instances after they’re done being used. Default: DEFAULT_SDC_TEAR_DOWN_ON_EXIT
  • always_pull (bool) – whether to always look for new SDC/stage lib Docker images. Default: DEFAULT_SDC_ALWAYS_PULL
  • network (str) – Docker network to use.
SDC_JAVA_OPTS

Get the SDC_JAVA_OPTS environment variable to be used by a Docker-based Data Collector instance.

add_environment_lib(*environment_libs)[source]

Add environment libraries to DataCollector instance.

Parameters:( (environment_libs) – obj:): One or more environment libraries.
add_stage_lib(*stage_libs)[source]

Add a stage library to DataCollector instance.

Args passed to this method will be checked against sdc.properties’ library aliases before being added.

add_user(user, password=None, roles=None, groups=None)[source]

Add a user to SDC’s form-realm.properties file.

Parameters:
  • user (str) – User to add.
  • password (str, optional) – Password for user. Default: same as user
  • roles (list, optional) – List of roles to assign to user. Default: None
  • groups (list, optional) – List of groups to make user a member of. Default: None
add_user_stage_lib(user_libs)[source]

Add a user stage library to DataCollector instance.

Args passed to this method will be checked against sdc.properties’ library aliases before being added.

configure_for_environment(*environments)[source]

Set DataCollector configurations based on one or more environments.

This method will update stage libraries (as specified in the environment’s sdc_stage_libs attribute), SDC properties (as specified in the environment’s sdc_configurations attribute), SDC resources (as specified in the environment’s sdc_resources attribute), the SDC security policy file (as specified in the environment’s sdc_security_policy attribute). It will also append things to SDC_JAVA_OPTS (as specified in the environment’s SDC_JAVA_OPTS attribute) and add to the list of shell commands to run in the SDC Docker container prior to starting SDC itself (as defined in the environment’s sdc_docker_shell_commands attribute).

Parameters:( (environments) – obj:): One or more environments.
configure_for_pipeline(*pipelines)[source]

Set DataCollector configurations based on one or more pipelines.

This method will update stage libraries (as specified in the pipeline stages’ library attributes).

Parameters:( (pipelines) – obj:): One or more pipelines.
enable_sch(sch)[source]

Enables DPM by getting application token from DPM.

:param An instanec of streamsets.ControlHub:

get_pipeline_builder()[source]

Get a PipelineBuilder instance with which a Pipeline can be created.

The Test Framework uses its own subclass of PipelineBuilder in order to have it return the
test-centric version of the Pipeline class that we need for testing (e.g. it has a configure_for_environment method).
Returns:An instance of streamsets.testframework.sdc_models.PipelineBuilder
start()[source]

Start the DataCollector.

tear_down()[source]

Tear down a Docker-based instance of SDC created with DataCollector.start().

Models

Classes for SDC-related models.

This module provides implementations of classes with which users may interact in the course of writing tests that exercise SDC functionality.

class streamsets.testframework.sdc_models.Pipeline(filepath=None, filename=None, pipeline=None, all_stages=None)[source]

SDC pipeline.

This class provides abstractions to make it easier to interact with a pipeline before it’s imported into SDC. If creating a Pipeline instance from an existing pipeline file, either filepath or filename should be specified, but not both.

Parameters:
  • filepath (str) – Absolute path to pipeline JSON file. Default: None
  • filename (str) – Name of the pipeline JSON file. It’s assumed that the pipeline
  • will be in caller's pipelines subdirectory. Default (file) – None
  • ( (pipeline) – obj:): A Python object representing the deserialized pipeline. Default: None
configure_for_environment(*environments)[source]

Configure pipeline based on environments.

Parameters:( (environments) – obj:): One or more environments.
Returns:(testframework.sdc_models.Pipeline)
class streamsets.testframework.sdc_models.PipelineBuilder(pipeline, definitions)[source]

Class with which to build SDC pipelines.

This class allows a user to programmatically generate an SDC pipeline. Instead of instantiating this class directly, most users should use streamsets.testframework.DataCollector.get_pipeline_builder().

Parameters:
  • pipeline – Python object representing an empty pipeline. If created manually, this would come from creating a new pipeline in SDC and then exporting it before doing any configuration.
  • definitions (dict) – The output of SDC’s definitions endpoint.
build(title='Pipeline')[source]

Build the pipeline.

Parameters:title (str, optional) – Pipeline title to use. Default: 'Pipeline'
Returns:An instance of streamsets.testframework.Pipeline
class streamsets.testframework.sdc_models.Stage(stage, label=None)[source]

Pipeline stage.

Parameters:( (stage) – obj:): JSON representation of the pipeline stage.
configure_for_environment(*environments)[source]

Set stage configurations based on the environment.

This method iterates through one or more environments and sets stage configurations.

Parameters:( (environments) – obj:): One or more environments.

Environments

Core APIs

Environment abstractions to interact with AWS.

class streamsets.testframework.environments.aws.AWSInstance(region=None, **kwargs)[source]

Class that encapsulates AWS instance.

Parameters:
  • region (str) – AWS region. Default: DEFAULT_REGION
  • s3_bucket_name (str) – AWS S3 bucket name
  • firehose_stream_name (str) – AWS Firehose stream name
aws_stage_lib

AWS stage lib name.

Returns:(str)
dynamodb

Return AWS Boto3 DynamoDB client.

Returns:(boto3.DynamoDB.Client)
ec2

Return AWS EC2 client.

Returns:(boto3.EC2.Client)
emr

Return AWS Boto3 EMR client.

Returns:(boto3.EMR.Client)
emr_hadoop_versions

AWS EMR Hadoop versions supported for an existing EMR cluster.

Returns:A list of Hadoop version strings in X_Y format.
Return type:(list)
emr_stage_lib_template

AWS EMR stage lib template name. Template holder for EMR Hadoop version.

Returns:(str)
emr_stage_libs

AWS EMR stage lib names.

Returns:(list)
firehose

Return AWS Firehose client.

Retruns:
(boto3.Firehose.Client)
kinesis

Return AWS Boto3 Kinesis client.

Returns:(boto3.Kinesis.Client)
kinesis_stage_lib

AWS Kinesis stage lib name.

Returns:(str)
s3

Return AWS Boto3 S3 client.

Returns:(boto3.S3.Client)
wait_for_stream_status(stream_name, status, timeout_sec=60)[source]

Wait for a given Kinesis stream status.

Parameters:
  • stream_name (str) – Kinesis stream name.
  • status (str) – Kinesis stream status.
  • timeout_sec (int, optional) – Default timeout for wait, in seconds. Default: 60

Environment abstractions to interact with Azure.

class streamsets.testframework.environments.azure.AzureDataLakeStore(store_name)[source]

Azure Data Lake Store service.

Parameters:store_name (str) – Azure Data Lake Store name.
file_system

Azure Data Lake Store file system.

Returns:(azure.datalake.store.AzureDLFileSystem)
class streamsets.testframework.environments.azure.AzureEventHubs(namespace, shared_access_key_name, shared_access_key_value)[source]

Azure Event Hub service.

Parameters:
  • namespace (str) – Azure Event Hub’s namespace.
  • shared_access_key_name (str) – Azure Event Hub’s Shared Access Key (SAS) name.
  • shared_access_key_value (str) – Azure Event Hub’s Shared Access Key (SAS) value.
service_bus

Return Azure Event Hub’s Service Bus Service.

Returns:(azure.servicebus.ServiceBusService)
class streamsets.testframework.environments.azure.AzureInstance(auth_token_endpoint=None, datalake_store_account_fqdn=None, iot_hub_topic=None, wasb_container=None)[source]

Class that encapsulates Azure.

Parameters:
  • auth_token_endpoint (str) – Azure Active Directory’s OAuth token endpoint URL. Default: None
  • datalake_store_account_fqdn (str) – The host name of the Data Lake Store. Format: <service name>.azuredatalakestore.net Default: None
  • iot_hub_topic (str) – Azure IoT Hub specific Service Bus routing topic. Default: None
  • wasb_container (str) – Azure Storage Blob container name. Default: None
datalake

Azure Data Lake Store instance.

Returns
(AzureDataLakeStore):
event_hubs

Azure Event Hub instance.

Returns:
Return type:(AzureEventHubs)
iot_hub

Azure IoT Hub instance.

Returns:(AzureIotHub)
service_bus

Azure Service Bus instance.

Returns:(AzureServiceBus)
storage

Azure Storage instance.

Returns:(AzureStorage)
class streamsets.testframework.environments.azure.AzureIotHub(namespace, shared_access_key_name, shared_access_key_value, host)[source]

Azure IoT Hub instance.

Parameters:
  • namespace (str) – Azure IoT Hub namespace.
  • shared_access_key_name (str) – Azure IoT Hub Shared Access Key (SAS) name.
  • shared_access_key_value (str) – Azure IoT Hub Shared Access Key (SAS) value.
  • host (str) – Azure IoT host name.
create_device_id(device_id)[source]

Create IoT device.

Parameters:device_id (str) – Device ID to use.
Returns:The json response from IoT Hub device creation.
delete_device_id(device_id)[source]

Delete IoT device.

Parameters:device_id (str) – Device ID to delete.
Returns:Device deletion flag.
Return type:(bool)
list_device_ids(top=100)[source]

List IoT device ID’s.

Parameters:top (int, optional) – List number of device ID’s. Default: 100
Returns:The json response from IoT Hub device’s list.
retrieve_device_id(device_id)[source]

Retrieve IoT device details.

Parameters:device_id (str) – Device ID to use.
Returns:The json response from IoT Hub device details retrieval.
send_device_to_cloud_message(device_id, message)[source]

Send message to IoT Hub cloud from IoT device.

Parameters:
  • device_id (str) – Device ID to use.
  • message (str) – Message to be sent.
Returns:

Message sent flag.

Return type:

(bool)

class streamsets.testframework.environments.azure.AzureServiceBus(namespace, shared_access_key_name, shared_access_key_value)[source]

Azure Service Bus instance.

Parameters:
  • namespace (str) – Azure Service Bus namespace.
  • shared_access_key_name (str) – Azure Service Bus Shared Access Key (SAS) name.
  • shared_access_key_value (str) – Azure Service Bus Shared Access Key (SAS) value.
service

Azure Service Bus Service.

Returns:(azure.servicebus.ServiceBusService)
class streamsets.testframework.environments.azure.AzureStorage(account_name, account_key, wasb_container=None)[source]

Azure Storage instance.

Parameters:
  • account_name (str) – Azure Storage Account name.
  • account_key (str) – Azure Storage Account key.
  • wasb_container (str) – Azure Storage Blob container name. Default: None
account

Azure Storage Account.

Returns:(azure.storage.CloudStorageAccount)
create_blob_container(container_name)[source]

Create Azure Storage Account blob container.

Parameters:container_name (str) – Container name.
Returns:Container creation flag.
Return type:(bool)
delete_blob_container(container_name)[source]

Delete Azure Storage Account blob container.

Parameters:container_name (str) – Container name.
Returns:Container deletion flag.
Return type:(bool)

Environment abstractions to interact with Apache Cassandra.

class streamsets.testframework.environments.cassandra.CassandraInstance(contacts, username=None, password=None, port=None, kerberos_enabled=None)[source]

Class that encapsulates Apache Cassandra client instance

Parameters:
  • contacts (list) – List of Cassandra contact points.
  • kerberos_enabled (bool) – Flag to indicate if Kerberos is enabled for Cassandra. Default: DEFAULT_CASSANDRA_KERBEROS_ENABLED
  • username (str) – Cassandra username.
  • password (str) – Cassandra password.
  • port (int) – Cassandra port.
client

Get a client which encapsulates cassandra.cluster.Cluster and cassandra.cluster.Session objects.

Returns:
Return type:(Client)

Environment abstractions to interact with Cloudera clusters.

class streamsets.testframework.environments.cloudera.ClouderaHbase(cloudera_manager_cluster, service_name)[source]

Cloudera HBase service.

client

Get an HBase client.

Returns:(happybase.Connection)
zookeeper_parent_znode

Get zookeeper.znode.parent from hbase-site.xml.

class streamsets.testframework.environments.cloudera.ClouderaHdfs(cloudera_manager_cluster, service_name)[source]

Cloudera HDFS service.

default_fs

Get fs.defaultFS from core-site.xml.

get_data_from_parquet(hdfs_parquet_file_path)[source]

Get data from HDFS Parquet file.

:param (str): Parquet file path in HDFS.

Returns:List of Parquet rows. Each row is of collections.OrderedDict object.
Return type:(list)
hadoop_security_authentication

Get hadoop.security.authentication from core-site.xml.

http_address

Get dfs.namenode.http-address from hdfs-site.xml.

write_data_to_avro(hdfs_avro_file_path, schema, records)[source]

Write data to HDFS Avro file.

:param (str): Avro file path in HDFS. :param (json): Avro Schema. :param (list): List of Avro records.

Returns:List of Avro records.
Return type:(list)
class streamsets.testframework.environments.cloudera.ClouderaHive(cloudera_manager_cluster, service_name)[source]

Cloudera Hive service.

client

Get a Hive client.

Returns:(impala.dbapi.connect)
hive_jdbc_url

Get Hive JDBC URL.

Returns:Hive JDBC URL.
Return type:(str)
hive_server2_authentication

Get hive.server2.authentication from hive-site.xml.

class streamsets.testframework.environments.cloudera.ClouderaKafka(cloudera_manager_cluster, service_name)[source]

Cloudera Kafka service.

client(**client_configs)[source]

Get a Kafka client available at default Kafka host and port.

cluster_env_lib_spark1

Get the name of the env stage lib needed by SDC in order to interact with the cluster’s version of Kafka.

Returns:(str)
cluster_stage_lib_spark1

Get the name of the cluster stage lib needed by SDC in order to interact with the cluster’s version of Kafka.

Returns:(str)
cluster_stage_lib_spark2

Get the name of the cluster stage lib needed by SDC in order to interact with the cluster’s version of Kafka. Unlike for the spark1 case, this will return a stage lib for Spark as opposed to for Kafka. e.g. cdh_spark_2_1 is returned when Spark 2.1 is installed.

Returns:(str)
consumer(**consumer_configs)[source]

Get a Kafka consumer.

is_kerberized

Check if kafka is kerberized.

is_ssl_client_authentication_required

Check if SSL client authentication is enabled for kafka.

is_ssl_enabled

Check if SSL encryption is enabled for kafka.

producer(**producer_configs)[source]

Get a Kafka producer.

security_protocol

Returns appropriate value for security.protocol.

standalone_stage_lib

Get the name of the standalone stage lib needed by SDC in order to interact with the standalone version of Kafka.

Returns:(str)
class streamsets.testframework.environments.cloudera.ClouderaKudu(cloudera_manager_cluster, service_name)[source]

Cloudera Kudu service.

engine

SQLAlchemy engine object.

Returns:(sqlalchemy.engine.Engine)
is_kerberized

Check if Kudu is kerberized.

stage_lib

Get the name of the stage lib needed by SDC in order to interact with the cluster’s version of Kudu.

Returns:(str)
class streamsets.testframework.environments.cloudera.ClouderaManagerCluster(server, username='admin', password='admin', cluster_name='cluster')[source]

Cloudera Manager cluster.

Abstractions to interact with a Cloudera Manager cluster.

Parameters:
  • server (str) – Cloudera Manager server (e.g. node-1.cluster:7180). If a port is excluded, will default to 7180.
  • username (str, optional) – Cloudera Manager server username. Default: admin
  • password (str, optional) – Cloudera Manager server password. Default: admin
  • cluster_name (str, optional) – The managed cluster’s name within Cloudera Manager. Default: cluster
get_cluster_service(service_name)[source]

Get Cloudera Manager service.

Returns:(dict)
get_cluster_services()[source]

Get Cloudera Manager services.

Returns:List of dictionaries for Cloudera services.
Return type:(list)
get_cluster_version()[source]

Get version of CDH cluster.

Returns:CDH version (e.g. ‘5.8.0’).
Return type:(str)
get_cm_service_roles()[source]

Get all roles of the Cloudera Management Service.

Returns:List of roles.
Return type:(list)
get_parcels()[source]

Get Cloudera Manager parcels.

Returns:List of dictionaries containing parcel details.
Return type:(list)
get_service_client_config(service_name)[source]

Get Cloudera service’s client config.

Returns:Zip-compressed archive in the form of a bytes string.
Return type:(str)
get_service_config(service_name)[source]

Get Cloudera service’s config.

Returns:List of dictionaries containing config.
Return type:(list)
get_service_role_config_group_configs(service_name, role_name)[source]

Get config for the role config group of this service for the given role name.

Returns:List of dictionaries containing config
Return type:(list)
get_service_role_config_groups(service_name)[source]

Get names for Cloudera service’s role config groups.

Returns:List of dictionaries containing role config groups
Return type:(list)
class streamsets.testframework.environments.cloudera.ClouderaManagerService(cloudera_manager_cluster, service_name)[source]

Cloudera Manager service.

Parameters:
  • cloudera_manager_cluster (testframework.environments.cloudera.ClouderaManagerCluster) –
  • service_name (str) –
get_client_configs()[source]

Get Cloudera Manager service’s client configs.

Returns:Cloudera Manager’s client configs.
Return type:(zipfile.ZipFile)
get_configs_from_xml_file(config_filename)[source]

Get a dictionary of configurations from a config file.

Parameters:config_filename (str) –
Returns:(dict) Configurations mapped from property name => property value.
get_service_config()[source]

Get Cloudera service’s config.

Returns:Configuration object with service configurations.
Return type:(testframework.models.Configuration)
get_service_role_config_group_configs(role_name)[source]

Get config for the role config group of this service for the given role name.

Returns:Configuration object of role config group
Return type:(testframework.models.Configuration)
get_service_role_config_group_names(role_type)[source]

Get names for Cloudera service’s role config groups for the given role type.

Returns:List of names of role config groups
Return type:(list)
class streamsets.testframework.environments.cloudera.ClouderaSolr(cloudera_manager_cluster, service_name)[source]

Cloudera Solr service.

client

Get a Solr client.

Returns:(SolrClient.solrclient.SolrClient)
class streamsets.testframework.environments.cloudera.ClouderaSpark2OnYarn(cloudera_manager_cluster, service_name)[source]

Cloudera Spark 2 on YARN service.

class streamsets.testframework.environments.cloudera.ClouderaSparkOnYarn(cloudera_manager_cluster, service_name)[source]

Cloudera Spark 1 on YARN service.

class streamsets.testframework.environments.cloudera.ClouderaYarn(cloudera_manager_cluster, service_name)[source]

Cloudera YARN service.

get_app_status_by_job_id(job_id)[source]

Get YARN application status.

Parameters:job_id (str) – YARN job id.
Returns:Dictionary of YARN application status.
Return type:(dict)
get_app_status_by_name(app_name)[source]

Get YARN application status.

Parameters:(obj (app_name) – str): YARN application name.
Returns:List of dictionary of YARN application status.
Return type:(list)
get_applications_status()[source]

Get YARN applications status.

Returns:List of dictionaries containing application status details.
Return type:(list)
wait_for_app_to_register(app_name, timeout_sec=120)[source]

Wait for YARN to have application register.

Parameters:
  • (obj (app_name) – str): YARN application name.
  • timeout_sec (int, optional) – Default timeout for wait, in seconds. Default: 120
Returns:

List of dictionary of YARN application status.

Return type:

(list)

wait_for_job_to_end(job_id, timeout_sec=120)[source]

Wait for YARN job to be finished.

Parameters:
  • job_id (str) – YARN job id.
  • timeout_sec (int, optional) – Default timeout for wait, in seconds. Default: 120
Returns:

YARN application finished state.

Return type:

(str)

wait_for_job_to_register(job_id, timeout_sec=120)[source]

Wait for YARN job to be registered.

Parameters:
  • job_id (str) – YARN job id.
  • timeout_sec (int, optional) – Default timeout for wait, in seconds. Default: 120
Returns:

Dictionary of YARN application status.

Return type:

(dict)

class streamsets.testframework.environments.cloudera.Navigator(cloudera_manager_cluster)[source]

Cloudera Navigator.

Parameters:server_host (str) – Cloudera Manager server host (e.g. node-1.cluster).
get_entities(query=None, ids=None, limit=100, offset=0)[source]

Get entities from Cloudera Navigator. :returns: List of dictionaries containing entities. :rtype: (list)

streamsets.testframework.environments.cloudera.get_config_file(config_filename, client_configs)[source]

Get config file contents from client configs.

Parameters:
  • config_filename (str) –
  • client_configs (zipfile.ZipFile) –
Returns:

The contents of the config file.

Return type:

(str)

Environment abstractions to interact with Confluent tools.

class streamsets.testframework.environments.confluent.ConfluentInstance(registry_urls=None)[source]

Class that encapsulates Confluent instance.

Currently the only supported piece is schema registry.

Parameters:registry_urls (str) – List of registry URLs
schema_registry

Return schema registry client.

Environment abstractions to interact with Couchbase.

class streamsets.testframework.environments.couchbase.CouchbaseInstance(uri)[source]

Class that encapsulates Couchbase client instance.

Parameters:uri (str) – Couchbase URI in couchbase://<username>:<password>@host:port format.
admin

Couchbase admin object.

Returns:(couchbase.admin.Admin)
cluster

Couchbase cluster object.

Returns:(couchbase.cluster.Cluster)
wait_for_healthy_bucket(bucket, timeout_sec=30)[source]

Wait until a bucket is healthy (or timeout).

Database environments.

The classes in this module enable interaction with a variety of databases.

class streamsets.testframework.environments.databases.MySqlDatabase(database, username=None, password=None, credential_store=None)[source]

MySQL database.

Parameters:
  • database (str) – The complete database connection string. This is everything that would normally come after jdbc:mysql:// and is normally of the form host:port/database.
  • username (str, optional) – MySQL username. Default: mysql
  • password (str, optional) – MySQL password. Default: mysql
  • credential_store (streamsets.testframework.credential_store.CredentialStore, optional) – Default: None
connection

MySQL connection object.

Returns:(pymysql.connect)
engine

SQLAlchemy engine object.

Returns:(sqlalchemy.engine.Engine)
class streamsets.testframework.environments.databases.Oracle(database, username=None, password=None, credential_store=None)[source]

Oracle database.

Parameters:
  • database (str) – The complete database connection string. This is everything that would normally come after oracle:// and is normally of the form host:port/database.
  • username (str, optional) – Oracle username. Default: sdc
  • password (str, optional) – Oracle password. Default: streamsets
  • credential_store (streamsets.testframework.credential_store.CredentialStore, optional) – Default: None
engine

SQLAlchemy engine object.

Returns:(sqlalchemy.engine.Engine)
class streamsets.testframework.environments.databases.PostgreSqlDatabase(database, username=None, password=None, credential_store=None)[source]

PostgreSQL database.

Parameters:
  • database (str) – The complete database connection string. This is everything that would normally come after jdbc:postgresql:// and is normally of the form host:port/database.
  • username (str, optional) – PostgreSQL username. Default: postgres
  • password (str, optional) – PostgreSQL password. Default: postgres
  • credential_store (streamsets.testframework.credential_store.CredentialStore, optional) – Default: None
engine

SQLAlchemy engine object.

Returns:(sqlalchemy.engine.Engine)
is_cdc_enabled

Checks if CDC is enabed for this instance of PostgreSQL.

Returns:(bool)
class streamsets.testframework.environments.databases.SQLServerDatabase(database, username=None, password=None, credential_store=None)[source]

SQL Server database.

Parameters:
  • database (str) – The complete database connection string. This is everything that would normally come after jdbc:sqlserver:// and is normally of the form host:port/database.
  • username (str, optional) – SQLServer username. Default: SA
  • password (str, optional) – SQLServer password. Default: Passw@rd1!
  • credential_store (streamsets.testframework.credential_store.CredentialStore, optional) – Default: None
engine

SQLAlchemy engine object.

Returns:(sqlalchemy.engine.Engine)

Environment abstractions to interact with Elasticsearch.

class streamsets.testframework.environments.elasticsearch.ElasticsearchInstance(url)[source]

Class that encapsulates Elasticsearch client instance

Parameters:URL (str) – Elasticsearch URL (e.g. http://elastic:changeme@myelastic.cluster:9200/).
connect()[source]

Elasticsearch default connection establishment.

get_version()[source]

Get version of Elasticsearch.

Returns:Elasticsearch version (e.g. ‘5.2.0’).
Return type:(str)

Environment abstractions to interact with Google products.

class streamsets.testframework.environments.gcp.GCPInstance(project_name, credentials_filename, bigtable_instance_name=None)[source]

Class that encapsulates Google Cloud client instances.

Parameters:
  • project_name (str) – Google Cloud project name.
  • credentials_filename (str) – The filename (JSON) of Google Cloud credentials. This file is assumed to exist in TESTFRAMEWORK_CONFIG_DIRECTORY (default: ~/.testframework).
bigquery_client

Google BigQuery client object.

Returns:(bigquery.Client)
bigtable_instance

Google Cloud Bigtable instance.

Returns:(bigtable.instance.Instance)
pubsub_publisher_client

Google Cloud Pub/Sub publisher client object.

Returns:(:obj:` pubsub_v1.PublisherClient`)
pubsub_subscriber_client

Google Cloud Pub/Sub subscriber client object.

Returns:(:obj:` pubsub_v1.SubscriberClient`)
storage_client

Google Cloud Storage client object.

Returns:(storage.Client)

Environment abstractions to interact with Ambari clusters.

class streamsets.testframework.environments.hortonworks.AmbariCluster(server_url, username='admin', password='admin', cluster_name='cluster')[source]

Ambari cluster.

Parameters:
class streamsets.testframework.environments.hortonworks.AmbariService(ambari_cluster, service_name)[source]

Ambari service.

Parameters:ambari_cluster (testframework.environments.hortonworks.AmbariCluster) –
get_configs_from_xml_file(config_filename)[source]

Get a dictionary of configurations from a config file. :param config_filename: :type config_filename: str

Returns:(dict) Configurations mapped from property name => property value.
streamsets.testframework.environments.hortonworks.DEFAULT_AMBARI_CLUSTER_NAME = 'cluster'
streamsets.testframework.environments.hortonworks.DEFAULT_AMBARI_PASSWORD = 'admin'
streamsets.testframework.environments.hortonworks.DEFAULT_AMBARI_USERNAME = 'admin'
class streamsets.testframework.environments.hortonworks.HortonworksHbase(ambari_cluster, service_name)[source]

Hortonworks HBase service.

client

Get an HBase client. :returns: (happybase.Connection)

zookeeper_parent_znode

Get zookeeper.znode.parent from hbase-site.xml.

class streamsets.testframework.environments.hortonworks.HortonworksHdfs(ambari_cluster, service_name)[source]

Hortonworks HDFS service.

client

Get an HDFS client.

Returns:An instance of hdfs.InsecureClient
http_address

Get dfs.namenode.http-address from hdfs-site.xml.

class streamsets.testframework.environments.hortonworks.HortonworksMapReduce2(ambari_cluster, service_name)[source]

Hortonworks MapReduce 2 service.

class streamsets.testframework.environments.hortonworks.HortonworksYarn(ambari_cluster, service_name)[source]

Hortonworks YARN service.

Environment abstractions for HTTP based services.

class streamsets.testframework.environments.http.HTTPClientInstance(http_server_url)[source]

Class that encapsulates interactions with HTTP server.

Parameters:http_server_url (str) – HTTP server URL.
mock()[source]

Get a HTTP server mock.

Returns:(HTTPMock)

Environment abstractions to interact with InfluxDB.

class streamsets.testframework.environments.influxdb.InfluxDBInstance(uri)[source]

Class that encapsulates InfluxDB client instance.

Parameters:uri (str) – InfluxDB URI in scheme://[username:password]@host:port[/[database]] format where scheme is one of influxdb or https+influxdb or udp+influxdb (e.g. influxdb://sdcuser:sdcpass@myinfluxdb.cluster:8086/testdb).
client

Get a InfluxDB client.

Returns:(influxdb.InfluxDBClient)
drop_measurement(measurement)[source]

Drop a measurement from InfluxDB.

Parameters:measurement (str) –
Returns:results (influxdb.resultset.ResultSet)

Environment abstractions to interact with JMS.

class streamsets.testframework.environments.jms.JMSInstance(uri, client_port=None)[source]

Class that encapsulates JMS client instance.

Parameters:uri (str) – JMS URI in <protocol>://<host>:<port> format.
client_connection

ActiveMQ client object that uses stomp.

Returns:(stomp.Connection)

Environment abstractions to interact with Cloudera clusters.

class streamsets.testframework.environments.kafka.KafkaCluster(brokers, zookeeper, version)[source]

Kafka Cluster

Parameters:
  • brokers (str) – Comma separated list of Kafka brokers.
  • zookeeper (str) – Comma separated list of zookeeper instances used for the cluster.
  • version (str) – Kafka version (e.g. 1.0.0 or 0.10.0.0)
stage_lib

Return name of the standalone stage library.

class streamsets.testframework.environments.kafka.KafkaInstance(stage_lib, brokers)[source]

Holder class for tests to access the Kafka clients.

client(**configs)[source]

Return instantiated Kafka Client.

cluster_stage_lib

Provided for compatibility with other Kafka ‘providers’.

consumer(**configs)[source]

Return instantiated Kafka Consumer.

producer(**configs)[source]

Return instantiated Kafka Producer.

standalone_stage_lib

Return name of the standalone stage library.

Abstractions for interacting with LDAP servers.

class streamsets.testframework.environments.ldap.OpenLdapServer(secured=False, docker_network='cluster')[source]

OpenLDAP server.

Parameters:
  • secured (bool, optional) – Default: False
  • docker_network (str, optional) – Name of the Docker network to which to connect the OpenLDAP container. Default: cluster
register_entries(ldif_filename)[source]

Register entries.

start()[source]

Start the OpenLDAP server.

streamsets.testframework.environments.ldap.get_ldapadd_command(ldif_filename)[source]

Get ldapadd command.

Returns:ldapadd command.
Return type:(str)

Environment abstractions to interact with MapR clusters.

streamsets.testframework.environments.mapr.DEFAULT_MAPR_CLUSTER_NAME = 'my.cluster.com'
streamsets.testframework.environments.mapr.DEFAULT_MAPR_PASSWORD = 'mapr'
streamsets.testframework.environments.mapr.DEFAULT_MAPR_USERNAME = 'mapr'
class streamsets.testframework.environments.mapr.MapRCluster(server_url, username='mapr', password='mapr', cluster_name='my.cluster.com')[source]

MapR cluster.

Abstractions to interact with a MapR cluster.

The discrepancy between the default values of the initializer in the auto-generated method signature and the values below is owed to logic used to convert the arguments into instance attributes.

Parameters:
  • server_url (str) – MapR Control System url (e.g. https://node-1.cluster:8443?mep=4.0).
  • username (str, optional) – MapR Control System username. Default: DEFAULT_MAPR_USERNAME
  • password (str, optional) – MapR Control System password. Default: DEFAULT_MAPR_PASSWORD
execute_command(*command, http_request_method='GET', params=None, data=None)[source]

Execute a command through the MapR REST API.

See http://maprdocs.mapr.com/home/ReferenceGuide/REST-API-Syntax.html for a reference on available commands.

Example

>>> cluster.execute_command('table', 'create', http_request_method='POST',
                            data={'path': '/user/sdc/table_name',
                                  'defaultreadperm': 'p',
                                  'defaultwriteperm': 'p'})
Parameters:
  • command – One or more string commands to execute.
  • http_request_method (str, optional) – HTTP method to use for request.
  • params (dict, optional) – Parameters to add to the command.
  • data (dict, optional) – Data to send with the request.
Returns:

The Python object of the JSON response.

get_cluster_version(cluster_name)[source]

Get cluster version.

Parameters:cluster_name (str) –
Returns:(str)
get_dashboard_info(cluster, multi_cluster_info=False, version=False, zkconnect=None)[source]

Get dashboard info.

Parameters:
  • cluster (str) – MapR cluster name.
  • multi_cluster_info (bool, optional) – Whether to display cluster information from multiple clusters. Default: False
  • version (bool, optional) – Whether to display the version. Default: False
  • zkconnect (str) – ZooKeeper Connect string. Default: None
Returns:

(dict) Summary of information about the cluster.

class streamsets.testframework.environments.mapr.MapRDB(mapr_cluster)[source]

MapR-DB service.

client

Get a MapR-DB client.

Returns:(happybase.Connection)
class streamsets.testframework.environments.mapr.MapRFS(mapr_cluster)[source]

MapR-FS service.

client

Get a MapR-FS client.

Returns:(hdfs.InsecureClient)
class streamsets.testframework.environments.mapr.MapRService(mapr_cluster)[source]

MapR service.

Parameters:mapr_cluster (testframework.environments.mapr.MapRCluster) –

Environment abstractions to interact with MongoDB database.

class streamsets.testframework.environments.mongodb.MongoDBDatabase(uri)[source]

Class that encapsulates MongoDB client instance.

Parameters:uri (str) – MongoDB URI in mongodb://<username>:<password>@<host>:<port>/<database>?<options> format.
engine

PyMongo MongoClient object.

Returns:(pymongo.MongoClient)

Utility for encapsulating an MQTT broker and associated client

class streamsets.testframework.environments.mqtt.MQTTInstance(broker)[source]

Class that encapsulates broker and client.

destroy()[source]

Cleans up resources associated with the MQTT instance.

do_mqtt_sanity_check(topic)[source]

Perform a sanity check to ensure broker is functioning (publish a message and consume it).

get_messages(topic, num=1, timeout_secs=10)[source]

Get message from the queue which receives messages from all subscribed topics.

initialize(initial_topics=None, qos=2)[source]

Initializes the MQTT client (with initial subscriptions).

publish_message(payload, topic='testframework_mqtt_topic', qos=2)[source]

Publishes a message to the specified topic with specified QOS.

Environment abstractions to interact with Pulsar pub-sub messaging system.

class streamsets.testframework.environments.pulsar.PulsarAdmin(web_service_url, tls_type=None)[source]

Class that encapsulates Pulsar admin REST interface.

Parameters:
  • web_service_url (str) – Pulsar web broker service URL (e.g. https//pulsar.cluster:8443).
  • tls_type (TlsType, optional) – Type of TLS to use for REST calls. Default: None
cluster_name

Get Pulsar cluster name.

Returns:(str)
delete(endpoint, params=None)[source]

REST DELETE interface.

delete_topic(topic, force=False, authoritative=False)[source]

Delete topic.

Parameters:
  • topic (:ob:`str`) – Topic in the form of <persistent|non-persistent>://{tenant}/{namespace}/{topic name}
  • force (bool, optional) – Default: False
  • authoritative (bool, optional) – Default: False
get(endpoint, params=None)[source]

REST GET interface.

get_cluster_config(cluster_name)[source]

Get Pulsar cluster configuration data.

Parameters:cluster_name (str) –
Returns:(dict)
post(endpoint, params=None, data=None)[source]

REST POST interface.

put(endpoint, params=None, data=None)[source]

REST PUT interface.

class streamsets.testframework.environments.pulsar.PulsarInstance(web_service_url)[source]

Class that encapsulates Pulsar instance.

Parameters:web_service_url (str) – Pulsar web service URL (e.g. https://pulsar.cluster:8443).
client

Get Pulsar client.

Returns:(pulsar.Client)
class streamsets.testframework.environments.pulsar.TlsType[source]

An enumeration.

Environment abstractions to interact with RabbitMQ message broker.

class streamsets.testframework.environments.rabbitmq.RabbitMQInstance(url)[source]

Class that encapsulates RabbitMQ instance

Parameters:url (str) – RabbitMQ URL (e.g. amqp://guest:guest@test-rabbit.cluster:5672/%2F).
blocking_connection

RabbitMQ blocking connection object.

Returns:(pika.adapters.blocking_connection.BlockingConnection)

Environment abstractions to interact with Redis.

class streamsets.testframework.environments.redis.RedisInstance(uri)[source]

Class that encapsulates Redis client instance

Parameters:uri (str) – Redis URI in redis://[:password]@host:port[/[database]] format (e.g. redis://myredis.cluster:6379/0).
client

Get a Redis client.

Returns:(redis.StrictRedis)

Environment abstractions to interact with Salesforce.

class streamsets.testframework.environments.salesforce.SalesforceInstance(password, username)[source]

Class that encapsulates Salesforce client instances.

Parameters:
  • password (str) – Salesforce password.
  • username (str) – Salesforce username.
client

Salesforce client object.

Returns:(Salesforce)

Environment abstractions to interact with Solr.

class streamsets.testframework.environments.solr.SolrInstance(uri)[source]

Class that encapsulates interactions with Solr.

Parameters:uri (str) – Solr URI in http://<host>:<port>/solr/<core_name> format.
client

Get a Solr client.

Returns:(SolrClient.solrclient.SolrClient)
get_version()[source]

Get version of Solr.

Returns:Solr version (e.g. ‘6.1.0’).
Return type:(str)

Environment abstractions to interact with TCP server.

class streamsets.testframework.environments.tcp.TCPInstance(host, port)[source]

Class that encapsulates interactions with TCP Server.

Example:

>>> from testframework.environment import TCPClient
>>> data = b'<42>Mar 24 17:18:10 10.1.2.34 Got an error
>>> client = TCPClient('localhost', 9876)
>>> response = client.send_bytes_and_ack(data, ack_in_seconds=1, randomly_slice=True)
Args:
host (str): TCP Server host. port (int): TCP Server port.
send_bytes_and_ack(data, ack_in_seconds=5, randomly_slice=False)[source]

Send given bytes to TCP Server and wait for given seconds for data received as an acknowledgement.

Parameters:
  • data (bytes) – Bytes to send to TCP Server.
  • ack_in_seconds (int, optional) – Receive data from TCP Server for this many number of seconds. Default: 5
  • randomly_slice (bool, optional) – Slice given data into random chunks before sending to support TCP framing. Default: False
  • Returns – (bytes): Receive data from TCP Server as bytes.
send_bytes_streaming(func, data, for_seconds=5, randomly_slice=False)[source]

Send given bytes to TCP server continously till for_seconds while also doing a callback to a given function.

Parameters:
  • ( (func) – func:): Callback function to call during streaming.
  • data (bytes) – Bytes to stream to TCP Server.
  • for_seconds (int, optional) – Stream data till this many number of seconds. Default: 5
  • randomly_slice (bool, optional) – Slice given data into random chunks before sending to support TCP framing. Default: False
send_str_and_ack(data, encode='UTF-8', ack_in_seconds=5, randomly_slice=False)[source]

Send given string to TCP Server and wait for given seconds for data received as an acknowledgement.

Parameters:
  • data (str) – String to send to TCP Server.
  • encode (str, optional) – Encoding of the given string. Default: UTF-8
  • ack_in_seconds (int, optional) – Receive data from TCP Server for this many number of seconds. Default: 5
  • randomly_slice (bool, optional) – Slice given data into random chunks before sending to support TCP framing. Default: False
  • Returns – (str): Receive data from TCP Server as string with given encode.
send_str_streaming(func, data, encode='UTF-8', for_seconds=5, randomly_slice=False)[source]

Send given string to TCP server continously till for_seconds while also doing a callback to a given function.

Parameters:
  • ( (func) – func:): Callback function to call during streaming.
  • data (str) – String to stream to TCP Server.
  • encode (str, optional) – Encoding of the given string. Default: UTF-8
  • for_seconds (int, optional) – Stream data till this many number of seconds. Default: 5
  • randomly_slice (bool, optional) – Slice given data into random chunks before sending to support TCP framing. Default: False

Environment abstractions to interact with Teradata.

class streamsets.testframework.environments.teradata.TeradataClient(rest_host, system, username, password)[source]

Class that encapsulates Teradata connection session.

Parameters:
  • rest_host (str) – The host name of the server hosting the REST service.
  • system (str) – The name of the system to connect. For REST, its the system alias configured in the REST service.
  • username (str) – The database username to use to connect.
  • password (str) – The database password to use to connect.
app_name (

obj:`str): App name generated for this session.

session
Type:teradata.udaexec.UdaExecConnection
class streamsets.testframework.environments.teradata.TeradataInstance(rest_host, system, username, password)[source]

Class that encapsulates interactions with Teradata database.

Parameters:
  • rest_host (str) – The host name of the server hosting the REST service.
  • system (str) – The name of the system to connect. For REST, its the system alias configured in the REST service.
  • username (str) – The database username to use to connect.
  • password (str) – The database password to use to connect.
client

Get Teradata client.

Returns:An instance of streamsets.testframework.environments.teradata.TeradataClient

Abstractions to interact with Azure Key Vault.

class streamsets.testframework.credential_stores.azure.AzureCredentialStore(store_id, vault_url, client_id, client_secret, tenant_id)[source]

Class that encapsulates Azure key vault credential store.

Parameters:
  • store_id (str) – SDC credential store ID.
  • vault_url (str) – Azure Key Vault credential provider URL. Typically as https://<YOUR_KEY_VAULT>.vault.azure.net/
  • client_id (str) – Azure client ID (also known as Application ID).
  • client_secret (str) – Azure client secret key (also known as Application key).
  • tenant_id (str) – Azure tenant ID (also known as Active Directory’s directory ID).
get_latest_key(key_name)[source]

Get latest key bundle.

Parameters:key_name (str) – The name of the key to get.
Returns:(azure.keyvault.models.KeyBundle)
get_latest_secret(secret_name)[source]

Get latest secret bundle.

Parameters:secret_name (str) – The name of the secret to get.
Returns:(azure.keyvault.models.SecretBundle)

Factories

Test Framework environments.

This module should only contain factory classes. These classes can be used by tests to create objects that interact with specific environment implementations. As an example, instead of having separate tests for pipelines that target MySQL and PostgreSQL, using Database allows a user to specify the database against which a test should run from the command line. For example:

# test_databases.py

from testframework import environment

def test_database(args):
    # Depending on what the user passes when running ``pytest --database = ...``, database
    # will refer to a specific database implementation.
    database = environment.Database()
class streamsets.testframework.environment.AWS[source]

A factory class that is used to create objects that interact with AWS.

Parameters:
  • region (str) – AWS region.
  • s3_bucket_name (str) – AWS S3 bucket name.
  • firehose_stream_name (str) – AWS Firehose stream name.
class streamsets.testframework.environment.Azure[source]

A factory class that is used to create objects that interact with Azure.

Parameters:
  • auth_token_endpoint (str) – Azure Active Directory’s OAuth token endpoint URL.
  • datalake_store_account_fqdn (str) – The host name of the Data Lake Store. Format: <service name>.azuredatalakestore.net
class streamsets.testframework.environment.Cluster[source]

A factory class that is used to create objects that interact with specific cluster implementations.

Example

>>> from testframework import environment
>>> cluster = environment.Cluster(cluster_server='cm://my-cm-host.com:7180')

The discrepancy between the default values of the initializer in the auto-generated method signature and the values below is owed to logic used to convert the arguments into instance attributes.

Parameters:
  • cluster_server (str) – URL to the cluster server. For example, cm://node-1.cluster:7180, mapr://https://node-1.cluster:8443?mep=4.0, or ambari://http://node-1.cluster:8080.
  • args (str) – All command line arguments
class streamsets.testframework.environment.Database[source]

A factory class that is used to create objects that interact with specific database implementations.

Example

>>> from testframework import environment
>>> db = environment.Database(database='mysql://my-db-host:3306/my-db-name')

The discrepancy between the default values of the initializer in the auto-generated method signature and the values below is owed to logic used to convert the arguments into instance attributes.

Parameters:
  • database (str) – URL to the database server. For example, mysql://hostname:3306/dbname.
  • username (str, optional) – Database username. Default: None
  • password (str, optional) – Database password. Default: None
  • credential_store (streamsets.testframework.credential_store.CredentialStore, optional) – Default: None
class streamsets.testframework.environment.LdapServer[source]

A factory class that is used to create objects that interact with specific LDAP server implementations.

class streamsets.testframework.environment.MQTT[source]

A factory class that is used to create an MQTT instance, which wraps a broker, an embedded client that can communicate with that broker, and convenience methods for publishing and subscribing.

Parameters:broker (str) – Host and [optional] port of MQTT broker. For example: mqtt.cluster:1883.
class streamsets.testframework.credential_store.CredentialStore[source]

A factory class that is used to create objects that interact with specific credential store implementations.

Parameters:
  • store_id (str) – Credential store ID. Same as SDC credential_stores.properties one of credentialStores key value. Some valid ones are: azure
  • args (str) – All command line arguments

Fixtures

conftest module for testframework package.

This module contains project-specific hook implementations to be used by Pytest.

streamsets.testframework.conftest.sdc_builder(args, sdc_common_hook, sdc_builder_hook, cluster, database, mqtt_broker, rabbitmq, aws, elasticsearch, gcp, jms, solr, mongodb, redis, salesforce, cassandra, http_client, azure, influxdb, couchbase, pulsar)[source]

Create a module-scoped DataCollector instance for use in tests.

By keeping separate fixtures for the ‘build’ and ‘execute’ phase of interacting with SDC, any test using the testframework.sdc_models.PipelineBuilder can automatically become an upgrade test. This fixture is for the former.

streamsets.testframework.conftest.sdc_executor(args, sdc_common_hook, sdc_builder, sdc_executor_hook, cluster, database, mqtt_broker, rabbitmq, aws, elasticsearch, gcp, jms, solr, mongodb, redis, salesforce, cassandra, http_client, azure, influxdb, couchbase, pulsar)[source]

Create a module-scoped DataCollector instance for use in tests.

As described above, this fixture is to be used in the ‘execute’ phase of SDC interactions.

streamsets.testframework.conftest.sdc_builder_hook()[source]

Returns function that sdc_builder fixture will call on the created DataCollector object before it will be started. This is particularly useful when overridden in tests that need to mutate the object configuration before start() method is called. Examples include adding new stages or changing sdc.properties.

streamsets.testframework.conftest.sdc_common_hook()[source]

Returns function that sdc_builder and sdc_executor fixtures call on the created DataCollector object before it will be started. Useful when overridden in tests that need to mutate the object configuration before DataCollector.start() method is called. Examples include adding new stages or changing sdc.properties.

streamsets.testframework.conftest.sdc_executor_hook()[source]

Returns function that sdc_executor fixture will call on the created DataCollector object before it will be started. This is particularly useful when overridden in tests that need to mutate the object configuration before start() method is called. Examples include adding new stages or changing sdc.properties.

conftest module for testframework package.

This module contains project-specific hook implementations to be used by Pytest.

streamsets.testframework.conftest.aws(args, request)[source]

Create a module-scoped AWS instance for use in tests.

streamsets.testframework.conftest.azure(request)[source]

Create a module-scoped Azure instance for use in tests.

streamsets.testframework.conftest.cassandra(args)[source]

Create a module-scoped Cassandra instance for use in tests.

streamsets.testframework.conftest.cluster(args)[source]

Create a module-scoped Cluster instance for use in tests.

streamsets.testframework.conftest.confluent(args)[source]

Create a module-scoped Confluent instance for use in tests.

streamsets.testframework.conftest.couchbase(args)[source]

Create a module-scoped Couchbase instance for use in tests.

streamsets.testframework.conftest.credential_store(args)[source]

Create a module-scoped CredentialStore instance.

streamsets.testframework.conftest.database(args, credential_store)[source]

Create a module-scoped Database instance for use in tests.

streamsets.testframework.conftest.elasticsearch(args)[source]

Create a module-scoped Elasticsearch instance for use in tests.

streamsets.testframework.conftest.gcp(args)[source]

Create a module-scoped GoogleCloud instance for use in tests.

streamsets.testframework.conftest.http_client(args)[source]

Create a module-scoped HTTP client instance for use in tests.

streamsets.testframework.conftest.influxdb(args)[source]

Create a module-scoped InfluxDB instance for use in tests.

streamsets.testframework.conftest.jms(args)[source]

Create a module-scoped JMS instance for use in tests.

streamsets.testframework.conftest.mongodb(args)[source]

Create a module-scoped MongoDB instance for use in tests.

streamsets.testframework.conftest.mqtt_broker(args)[source]

Create a module-scoped MQTT instance for use in tests.

streamsets.testframework.conftest.pulsar(args)[source]

Create a module-scoped Pulsar instance for use in tests.

streamsets.testframework.conftest.rabbitmq(args)[source]

Create a module-scoped RabbitMQ instance for use in tests.

streamsets.testframework.conftest.redis(args)[source]

Create a module-scoped Redis instance for use in tests.

streamsets.testframework.conftest.salesforce(args)[source]

Create a module-scoped Salesforce instance for use in tests.

streamsets.testframework.conftest.solr(args)[source]

Create a module-scoped Solr instance for use in tests.

streamsets.testframework.conftest.teradata(args)[source]

Create a module-scoped Teradata instance for use in tests.

System models

Classes for System related models.

class streamsets.testframework.system.Container[source]

Class to encapsulate Docker container.

create(container_args)[source]

Create underlying Docker container.

Parameters:container_args (dict) – Container args to pass to underlying Docker create container command.
network_disconnect()[source]

Disconnect attached network from this Container.

network_reconnect()[source]

Re-connect attached network to this Container.

Utility functions

Assorted utility functions to be used by Test Framework entities.

streamsets.testframework.utils.DEFAULT_TIMEOUT = 60
streamsets.testframework.utils.DEFAULT_TIME_BETWEEN_CHECKS = 1
class streamsets.testframework.utils.ShellCommand(command, user)
command

Alias for field number 0

user

Alias for field number 1

class streamsets.testframework.utils.Version(version)[source]

Maven version string abstraction.

Use this class to enable correct comparison of Maven versioned projects. For our purposes, any version is equivalent to any other version that has the same 4-digit version number (i.e. 3.0.0.0-SNAPSHOT == 3.0.0.0-RC2 == 3.0.0.0).

Parameters:version (str) – Version string (e.g. ‘2.5.0.0-SNAPSHOT’)
streamsets.testframework.utils.get_params(parameters, exclusions=None)[source]

Get a dictionary of parameters to be passed as requests methods’ params argument.

The typical use of this method is to pass in locals() from a function that wraps a REST endpoint. It will then create a dictionary, filtering out any exclusions (e.g. path parameters) and unset parameters, and use camelize to convert arguments from this_style to thisStyle.

streamsets.testframework.utils.get_random_slices(data)[source]

Returns a list of data sliced by random length.

streamsets.testframework.utils.get_random_string(characters, length=8)[source]

Returns a string of the requested length consisting of random combinations of the given sequence of string characters.

streamsets.testframework.utils.get_tarstream(folder_name, files)[source]

Given a resource_folder_name string and dictionary representing resources (with filenames as keys and string representations of the files as values), return a bytes-like object of a tarball containing said resources. This is particularly useful for Docker with its API allowing for extraction of an archive of files to a directory in a container.

streamsets.testframework.utils.join_url_parts(*parts)[source]

Join a URL from a list of parts. See http://stackoverflow.com/questions/24814657 for examples of why urllib.parse.urljoin is insufficient for what we want to do.

streamsets.testframework.utils.lazy_property(function)[source]

Decorator that makes a property lazy-evaluated.

streamsets.testframework.utils.parse_sdc_versions(pytest_config)[source]

Parse out SDC versions from –sdc-version.

Parameters:pytest_config (_pytest.config.Config) – pytest config instance.
Returns:
A named tuple with attributes of sdc_version,
pre_upgrade_sdc_version, and post_upgrade_sdc_version.
Return type:(collections.namedtuple)
streamsets.testframework.utils.pipeline_json_encoder(o)[source]

Default method for JSON encoding of custom classes.

streamsets.testframework.utils.port_is_open(address, port)[source]

Returns True if port at address is open.

streamsets.testframework.utils.run_container_shell_commands(docker_client, container_id, shell_commands, stream=True)[source]

Run a list of shell commands in a container.

streamsets.testframework.utils.sdc_value_reader(value)[source]

Helper function which can parse SDC Record value (Record JSON in dict format for example) and convert to SDC implied Python collection with necessary types being converted from SDC to Python.

Parameters:( (value) – obj:): SDC Record value.
Returns:obj:)
Return type:(
streamsets.testframework.utils.start_docker_container(docker_client, image_name, network, ports, port_bindings=None, publish_all_ports=True, tty=False, override_configs=None)[source]

Starts a Docker container for a specified image, on the specified network, exposing the specified container ports. To control mappings from container ports to host ports, you will need to use the port_bindings param.

See docker-py 1.10.4 docs for info on setting port_bindings: http://docker-py.readthedocs.io/en/1.10.4/volumes/

streamsets.testframework.utils.stop_and_remove_container(docker_client, container_id)[source]

Stops and removes a Docker container with the specified ID

streamsets.testframework.utils.verify_docker_image_presence(docker_client, image_name)[source]

Verify a Docker image is present on this machine. Attempts to pull if not.

streamsets.testframework.utils.wait_for_condition(condition, condition_args=None, condition_kwargs=None, time_between_checks=1, timeout=60, time_to_success=0, success=None, failure=None)[source]

Wait until a condition is satisfied (or timeout).

Parameters:
  • condition – Callable to evaluate.
  • condition_args (optional) – A list of args to pass to the condition. Default: None
  • condition_kwargs (optional) – A dictionary of kwargs to pass to the condition. Default: None
  • time_between_checks (int, optional) – Seconds between condition checks. Default: DEFAULT_TIME_BETWEEN_CHECKS
  • timeout (int, optional) – Seconds to wait before timing out. Default: DEFAULT_TIMEOUT
  • time_to_success (int, optional) – Seconds for the condition to hold true before it is considered satisfied. Default: 0
  • success (optional) – Callable to invoke when condition succeeds. A time variable will be passed as an argument, so can be used. Default: None
  • failure (optional) – Callable to invoke when timeout occurs. timeout will be passed as an argument. Default: None
Raises:

TimeoutError

streamsets.testframework.utils.wait_for_container_port_open(docker_client, container_id, port, timeout_sec=60, verbose=False)[source]

Check the accessibility of a container’s port in a loop until it succeeds or times out.

streamsets.testframework.utils.wait_for_port_open(address, port, timeout_sec=60)[source]

Check the accessibility of address:port in a loop until it succeeds or times out.