Skip to main content

Testing Layers for use with zope.testing

Project description

Test layers with working directories

There is a mixin class that provides usefull methods to generate a working directory and make snapshots thereof.

>>> from lovely.testlayers.layer import WorkDirectoryLayer

Let us create a sample layer.

>>> class MyLayer(WorkDirectoryLayer):
...     def __init__(self, name):
...         self.__name__ = name
>>> myLayer = MyLayer('mylayer')

To initialize the directories we need to create the directory structure.

>>> myLayer.setUpWD()

We can get relative paths by using the os.path join syntax.

>>> myLayer.wdPath('a', 'b')
'.../__builtin__.MyLayer.mylayer/work/a/b'

Let us create a directory.

>>> import os
>>> os.mkdir(myLayer.wdPath('firstDirectory'))

And make a snapshot.

>>> myLayer.makeSnapshot('first')

We can check if we have a snapshot.

>>> myLayer.hasSnapshot('first')
True

And now we make a second directory and another snapshot.

>>> os.mkdir(myLayer.wdPath('secondDirectory'))
>>> myLayer.makeSnapshot('second')

We now have 2 directories.

>>> sorted(os.listdir(myLayer.wdPath()))
['firstDirectory', 'secondDirectory']

We now restore the “first” snapshot

>>> myLayer.restoreSnapshot('first')
>>> sorted(os.listdir(myLayer.wdPath()))
['firstDirectory']

We can also restore the “second” snapshot.

>>> myLayer.restoreSnapshot('second')
>>> sorted(os.listdir(myLayer.wdPath()))
['firstDirectory', 'secondDirectory']

We can also override snapshots.

>>> os.mkdir(myLayer.wdPath('thirdDirectory'))
>>> myLayer.makeSnapshot('first')
>>> myLayer.restoreSnapshot('first')
>>> sorted(os.listdir(myLayer.wdPath()))
['firstDirectory', 'secondDirectory', 'thirdDirectory']

memcached test layer

This layer starts and stops a memcached daemon on given port (default is 11222)

>>> from lovely.testlayers import memcached
>>> ml = memcached.MemcachedLayer('ml')

So let us setup the server.

>>> ml.setUp()

Now we can acces memcached on port 11222.

>>> import telnetlib
>>> tn =  telnetlib.Telnet('localhost', 11222)
>>> tn.close()

No more after teardown.

>>> ml.tearDown()
>>> tn =  telnetlib.Telnet('localhost', 11222)
Traceback (most recent call last):
...
error:...Connection refused...

Nginx test layer

This test layer starts and stops an nginx server.

The layer is constructed with the optional path to the nginx command and a prefix directory for nginx to run. To demonstrate this, we create a temporary nginx home, where nginx should run.

>>> import tempfile, shutil, os
>>> tmp = tempfile.mkdtemp()
>>> nginx_prefix = os.path.join(tmp, 'nginx_home')
>>> os.mkdir(nginx_prefix)

We have to add a config file at the default location. Let us define a minimal configuration file.

>>> os.mkdir(os.path.join(nginx_prefix, 'conf'))
>>> cfg = file(os.path.join(nginx_prefix, 'conf', 'nginx.conf'), 'w')
>>> cfg.write("""
... events {
...     worker_connections  10;
... }
... http {
...     server {
...       listen 127.0.0.1:12345;
...     }
... }""")
>>> cfg.close()

And the log directory.

>>> os.mkdir(os.path.join(nginx_prefix, 'logs'))

Let us also define the nginx executable. There is already one installed via buildout in the root directory of this package, so we get the path to this executable. Using a special nginx that is built via buildout is the common way to use this layer. This way the same nginx might be used for local development with the configuration defined by the buildout.

>>> nginx_cmd = os.path.join(os.path.dirname(os.path.dirname(
...     os.path.dirname(os.path.dirname(os.path.abspath(__file__))))),
...                          'parts', 'nginx', 'sbin', 'nginx')

Now we can instantiate the layer.

>>> from lovely.testlayers import nginx
>>> nl = nginx.NginxLayer('nl', nginx_prefix, nginx_cmd=nginx_cmd)

Upon layer setup the server gets started.

>>> nl.setUp()

We can now issue requests, we will get a 404 because we didn’t setup any urls, but for testing this is ok.

>>> import urllib2
>>> urllib2.urlopen('http://localhost:12345/', None, 1)
Traceback (most recent call last):
...
HTTPError: HTTP Error 404: Not Found

Upon layer tearDown the server gets stopped.

>>> nl.tearDown()

We cannot connect to the server anymore now.

>>> urllib2.urlopen('http://localhost:12345/', None, 1)
Traceback (most recent call last):
...
URLError: <urlopen error [Errno 61] Connection refused>

Failures

Startup and shutdown failures are also catched. For example if we try to tear down the layer twice.

>>> nl.tearDown()
Traceback (most recent call last):
...
RuntimeError: Nginx stop failed ...nginx.pid" failed
 (2: No such file or directory)

Or if we try to start the server twice.

>>> nl.setUp()
>>> nl.setUp()
Traceback (most recent call last):
...
RuntimeError: Nginx start failed [emerg]:
 bind() to 127.0.0.1:12345 failed (48: Address already in use)
...
[emerg]: still could not bind()
>>> nl.tearDown()

Cleanup the temporary directory, we don’t need it for testing from this point.

>>> shutil.rmtree(tmp)

Nearly all failures should be catched upon initialization, because the layer does a config check then.

Let us provide a non existing prefix path.

>>> nginx.NginxLayer('nl', 'something')
Traceback (most recent call last):
...
AssertionError: prefix not a directory '.../something/'

Or a not existing nginx_cmd.

>>> nginx.NginxLayer('nl', '.', 'not-an-nginx')
Traceback (most recent call last):
...
RuntimeError: Nginx check failed /bin/sh: not-an-nginx: command not found

Or some missing aka broken configuration. We just provide our working directory as the prefix, which actually does not contain any configs.

>>> nginx.NginxLayer('nl', '.', nginx_cmd)
Traceback (most recent call last):
RuntimeError: Nginx check failed nginx version: nginx/...
[alert]: could not open error log file...
... [emerg] ...
configuration file .../conf/nginx.conf test failed

Cassandra test layer

This layer starts and stops a cassandra instance with a given storage configuration template. For information about cassandra see: http://en.wikipedia.org/wiki/Cassandra_(database)

>>> from lovely.testlayers import cass

An example template exists in this directory which we now use for this example.

>>> import os
>>> storage_conf_tmpl = os.path.join(os.path.dirname(__file__),
...                                  'storage-conf.xml.in')

The following keys are provided when the template gets evaluated. Let us look them up in the example file.

>>> import re
>>> tmpl_pat = re.compile(r'.*\%\(([^ \)]+)\)s.*')
>>> conf_keys = set()
>>> for l in file(storage_conf_tmpl).readlines():
...     m = tmpl_pat.match(l)
...     if m:
...         conf_keys.add(m.group(1))
>>> sorted(conf_keys)
['control_port', 'storage_port', 'thrift_port', 'var']

With the storage configuration path we can instantiate a new cassandra layer. The thrift_port, storage_port, and control_port are optional keyword arguments for the constructor and default to the standard port +10000.

>>> l = cass.CassandraLayer('l', storage_conf=storage_conf_tmpl)
>>> l.thrift_port
19160

So let us setup the server.

>>> l.setUp()

Now the cassandra server is up and running. We test this by connecting to the thrift port via telnet.

>>> import telnetlib
>>> tn = telnetlib.Telnet('localhost', l.thrift_port)
>>> tn.close()

The connection is refused after teardown.

>>> l.tearDown()
>>> telnetlib.Telnet('localhost', l.thrift_port)
Traceback (most recent call last):
...
error:...Connection refused

myserver control

>>> from lovely.testlayers import mysql
>>> import tempfile, os
>>> tmp = tempfile.mkdtemp()
>>> dbDir = os.path.join(tmp, 'db')
>>> dbDirFake = os.path.join(tmp, 'dbfake')
>>> dbName = 'testing'

Let us create a mysql server.

>>> srv = mysql.Server(dbDir, port=17777)

And init the db.

>>> srv.initDB()
>>> srv.start()
>>> import time
>>> time.sleep(3)
>>> srv.createDB(dbName)

Now we can get a list of databases.

>>> sorted(srv.listDatabases())
['mysql', 'test', 'testing']

If no mysql server is installed on the system we will get an exception:

>>> srv.orig_method = srv.mysqld_path
>>> srv.mysqld_path = lambda: None

>>> srv.start()
Traceback (most recent call last):
IOError: mysqld was not found. Is a MySQL server installed?

>>> srv.mysqld_path = srv.orig_method

Run SQL scripts

We can run scripts from the filesystem.

>>> script = os.path.join(tmp, 'ascript.sql')
>>> f = file(script, 'w')
>>> f.write("""drop table if exists a; create table a (title varchar(64));""")
>>> f.close()
>>> srv.runScripts(dbName, [script])

Dump and Restore

Let us make a dump of our database

>>> dumpA = os.path.join(tmp, 'a.sql')
>>> srv.dump(dbName, dumpA)

And now some changes

>>> import _mysql
>>> conn = _mysql.connect(host='127.0.0.1', port=17777, user='root', db=dbName)
>>> for i in range(5):
...     conn.query('insert into a values(%i)' % i)
>>> conn.commit()
>>> conn.close()

Another dump.

>>> dumpB = os.path.join(tmp, 'b.sql')
>>> srv.dump(dbName, dumpB)

We restore dumpA and the table is emtpy.

>>> srv.restore(dbName, dumpA)
>>> conn = _mysql.connect(host='127.0.0.1', port=17777, user='root', db=dbName)
>>> conn.query('select count(*) from a')
>>> conn.store_result().fetch_row()
(('0',),)
>>> conn.close()

Now restore dumpB and we have our 5 rows back.

>>> srv.restore(dbName, dumpB)
>>> conn = _mysql.connect(host='127.0.0.1', port=17777, user='root', db=dbName)
>>> conn.query('select count(*) from a')
>>> conn.store_result().fetch_row()
(('5',),)
>>> conn.close()

If we try to restore a none existing file we gat a ValueError.

>>> srv.restore(dbName, 'asdf')
Traceback (most recent call last):
...
ValueError: No such file '.../asdf'
>>> srv.stop()

MySQLDB Scripts

We can generate a control script for use as commandline script.

The simplest script is just to define a server.

>>> dbDir2 = os.path.join(tmp, 'db2')
>>> main = mysql.MySQLDBScript(dbDir2, port=17777)
>>> main.start()
>>> sorted(main.srv.listDatabases())
['mysql', 'test']
>>> main.stop()

We can also define a database to be created upon startup.

>>> main = mysql.MySQLDBScript(dbDir2, dbName='hoschi', port=17777)
>>> main.start()
>>> sorted(main.srv.listDatabases())
['hoschi', 'mysql', 'test']
>>> main.stop()

The database is created only one time.

>>> main.start()
>>> main.stop()

And also scripts to be executed.

>>> main = mysql.MySQLDBScript(dbDir2, dbName='hoschi2',
...                          scripts=[script], port=17777)
>>> main.start()

Note that we used the same directory here so the other db is still there.

>>> sorted(main.srv.listDatabases())
['hoschi', 'hoschi2', 'mysql', 'test']

We can run the scripts again. Note that scripts should always be none-destructive. So if a schema update is due one just needs to run all scripts again.

>>> main.runscripts()
>>> main.stop()

MySQLDatabaseLayer

Let’s create a layer:

>>> layer = mysql.MySQLDatabaseLayer('testing')

We can get the store uri.

>>> layer.storeURI()
'mysql://localhost:16543/testing'
>>> layer.setUp()
>>> layer.tearDown()

The second time the server ist started it takes the snapshot.

>>> layer.setUp()
>>> layer.tearDown()

If we try to run setup twice or the port is occupied, we get an error.

>>> layer.setUp()
>>> layer.setUp()
Traceback (most recent call last):
RuntimeError: Port already listening: 16543
>>> layer.tearDown()

We can have appsetup definitions and sql scripts. There is also a convinience class that let’s us execute sql statements as setup.

>>> setup = mysql.ExecuteSQL('create table testing (title varchar(32))')
>>> layer = mysql.MySQLDatabaseLayer('testing', setup=setup)
>>> layer.setUp()
>>> layer.tearDown()
>>> layer = mysql.MySQLDatabaseLayer('testing', setup=setup)
>>> layer.setUp()
>>> layer.tearDown()

Also if the database name is different, the same snapshots can be used.

>>> layer2 = mysql.MySQLDatabaseLayer('testing2', setup=setup)
>>> layer2.setUp()
>>> layer2.tearDown()

If we do not provide the snapsotIdent the ident is built by using the dotted name of the setup callable and the hash of the arguments.

>>> layer.snapshotIdent
u'lovely.testlayers.mysql.ExecuteSQLe449d7734c67c100e0662d3319fe3f410e78ebcf'

Let us provide an ident and scripts.

>>> layer = mysql.MySQLDatabaseLayer('testing3', setup=setup,
...                                  snapshotIdent='blah',
...                                  scripts=[script])
>>> layer.snapshotIdent
'blah'
>>> layer.scripts
['/.../ascript.sql']

On setup the snapshot with the setup is created, therefore setup is called with the server as argument.

>>> layer.setUp()

Upon testSetUp this snapshot is now restored.

>>> layer.testSetUp()

So now we should have the table there.

>>> conn = _mysql.connect(host='127.0.0.1', port=16543, user='root', db=dbName)
>>> conn.query('select * from testing')
>>> conn.store_result().fetch_row()
()
>>> conn.close()

Let us add some data (we are now in a test):

>>> conn = _mysql.connect(host='127.0.0.1', port=16543, user='root', db=dbName)
>>> conn.query("insert into testing values('hoschi')")
>>> conn.commit()
>>> conn.query('select * from testing')
>>> conn.store_result().fetch_row()
(('hoschi',),)
>>> conn.close()
>>> layer.testTearDown()
>>> layer.tearDown()

Finally do some cleanup:

>>> import shutil
>>> shutil.rmtree(tmp)

pgserver control

>>> from lovely.testlayers import pgsql
>>> import tempfile, os
>>> tmp = tempfile.mkdtemp()
>>> dbDir = os.path.join(tmp, 'db')
>>> dbDirFake = os.path.join(tmp, 'dbfake')
>>> dbName = 'testing'

Let us create a postgres server.

>>> srv = pgsql.Server(dbDir, port=16666)

Optional we could also define a path to a special postgresql.conf file to use, otherwise defaults are used.

>>> srv.postgresqlConf
'/.../lovely/testlayers/postgresql8....conf'
>>> srvFake = pgsql.Server(dbDirFake, postgresqlConf=srv.postgresqlConf)
>>> srvFake.postgresqlConf == srv.postgresqlConf
True

The path needs to exist.

>>> pgsql.Server(dbDirFake, postgresqlConf='/not/existing/path')
Traceback (most recent call last):
...
ValueError: postgresqlConf not found '/not/existing/path'

We can also specify the pg_config executable which defaults to ‘pg_config’ and therefore needs to be in the path.

>>> srv.pgConfig
'/.../pg_config'
>>> pgsql.Server(dbDirFake, pgConfig='notexistingcommand')
Traceback (most recent call last):
...
ValueError: pgConfig not found 'notexistingcommand'

The server is aware of its version, which is represented as a tuple of ints.

>>> srv.pgVersion
(8, ..., ...)

And init the db.

>>> srv.initDB()
>>> srv.start()
>>> srv.createDB(dbName)

Now we can get a list of databases.

>>> sorted(srv.listDatabases())
['postgres', 'template0', 'template1', 'testing']

Run SQL scripts

We can run scripts from the filesystem.

>>> script = os.path.join(tmp, 'ascript.sql')
>>> f = file(script, 'w')
>>> f.write("""create table a (title varchar);""")
>>> f.close()
>>> srv.runScripts(dbName, [script])

Or from the shared directories by prefixing it with pg_config. So let us install tsearch2.

>>> script = 'pg_config:share:contrib/tsearch2.sql'
>>> srv.runScripts(dbName, [script])

Dump and Restore

Let us make a dump of our database

>>> dumpA = os.path.join(tmp, 'a.sql')
>>> srv.dump(dbName, dumpA)

And now some changes

>>> import psycopg2
>>> cs = "dbname='%s' host='127.0.0.1' port='16666'" % dbName
>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> for i in range(5):
...     cur.execute('insert into a values(%i)' % i)
>>> conn.commit()
>>> cur.close()
>>> conn.close()

Another dump.

>>> dumpB = os.path.join(tmp, 'b.sql')
>>> srv.dump(dbName, dumpB)

We restore dumpA and the table is emtpy.

>>> srv.restore(dbName, dumpA)
>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> cur.execute('select count(*) from a')
>>> cur.fetchone()
(0L,)
>>> cur.close()
>>> conn.close()

Now restore dumpB and we have our 5 rows back.

>>> srv.restore(dbName, dumpB)
>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> cur.execute('select count(*) from a')
>>> cur.fetchone()
(5L,)
>>> cur.close()
>>> conn.close()

If we try to restore a none existing file we gat a ValueError.

>>> srv.restore(dbName, 'asdf')
Traceback (most recent call last):
...
ValueError: No such file '.../asdf'
>>> srv.stop()

PGDB Scripts

We can generate a control script for use as commandline script.

The simplest script is just to define a server.

>>> dbDir2 = os.path.join(tmp, 'db2')
>>> main = pgsql.PGDBScript(dbDir2, port=16666)
>>> main.start()
>>> sorted(main.srv.listDatabases())
['postgres', 'template0', 'template1']
>>> main.stop()

We can also define a database to be created upon startup.

>>> main = pgsql.PGDBScript(dbDir2, dbName='hoschi', port=16666)
>>> main.start()
>>> sorted(main.srv.listDatabases())
['hoschi', 'postgres', 'template0', 'template1']
>>> main.stop()

The database is created only one time.

>>> main.start()
>>> main.stop()

And also scripts to be executed.

>>> main = pgsql.PGDBScript(dbDir2, dbName='hoschi2',
...                          scripts=[script], port=16666)
>>> main.start()

Note that we used the same directory here so the other db is still there.

>>> sorted(main.srv.listDatabases())
['hoschi', 'hoschi2', 'postgres', 'template0', 'template1']

We can run the scripts again. Note that scripts should always be none-destructive. So if a schema update is due one just needs to run all scripts again.

>>> main.runscripts()
>>> main.stop()

Finally do some cleanup:

>>> import shutil
>>> shutil.rmtree(tmp)

PGDatabaseLayer

Let’s create a layer:

>>> layer = pgsql.PGDatabaseLayer('testing')

We can get the store uri.

>>> layer.storeURI()
'postgres://localhost:15432/testing'
>>> layer.setUp()
>>> layer.tearDown()

The second time the server ist started it takes the snapshot.

>>> layer.setUp()
>>> layer.tearDown()

If we try to run setup twice or the port is occupied, we get an error.

>>> layer.setUp()
>>> layer.setUp()
Traceback (most recent call last):
...
RuntimeError: Port already listening: 15432
>>> layer.tearDown()

We can have appsetup definitions and sql scripts. There is also a convinience class that let’s us execute sql statements as setup.

>>> setup = pgsql.ExecuteSQL('create table testing (title varchar)')
>>> layer = pgsql.PGDatabaseLayer('testing', setup=setup)
>>> layer.setUp()
>>> layer.tearDown()
>>> layer = pgsql.PGDatabaseLayer('testing', setup=setup)
>>> layer.setUp()
>>> layer.tearDown()

Also if the database name is different, the same snapshots can be used.

>>> layer2 = pgsql.PGDatabaseLayer('testing2', setup=setup)
>>> layer2.setUp()
>>> layer2.tearDown()

If we do not provide the snapsotIdent the ident is built by using the dotted name of the setup callable and the hash of the arguments.

>>> layer.snapshotIdent
u'lovely.testlayers.pgsql.ExecuteSQLf9bb47b1baeff8d57f8f0dadfc91b99a3ee56991'

Let us provide an ident and scripts.

>>> layer = pgsql.PGDatabaseLayer('testing3', setup=setup,
...                               snapshotIdent='blah',
...                               scripts=['pg_config:share:contrib/tsearch2.sql'])
>>> layer.snapshotIdent
'blah'
>>> layer.scripts
['pg_config:share:contrib/tsearch2.sql']

On setup the snapshot with the setup is created, therefore setup is called with the server as argument.

>>> layer.setUp()

Upon testSetUp this snapshot is now restored.

>>> layer.testSetUp()

So now we should have the table there.

>>> cs = "dbname='testing3' host='127.0.0.1' port='15432'"
>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> cur.execute('select * from testing')
>>> cur.fetchall()
[]
>>> cur.close()
>>> conn.close()

Let us add some data (we are now in a test):

>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> cur.execute("insert into testing values('hoschi')")
>>> conn.commit()
>>> cur.execute('select * from testing')
>>> cur.fetchall()
[('hoschi',)]
>>> cur.close()
>>> conn.close()
>>> layer.testTearDown()

Now the next test comes.

>>> layer.testSetUp()

Make sure we can abort a transaction. The storm synch needs to be removed at this time.

>>> import transaction
>>> transaction.abort()

And the data is gone but the table is still there.

>>> conn = psycopg2.connect(cs)
>>> cur = conn.cursor()
>>> cur.execute('select * from testing')
>>> cur.fetchall()
[]
>>> cur.close()
>>> conn.close()
>>> layer.tearDown()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lovely.testlayers-0.1.0a4.tar.gz (29.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page