Table des matières

Interface Python pour Analog Discovery

1. Introduction

La société Digilent fabrique des appareils USB sous le nom Analog Discovery comportant un oscilloscope, un générateur de signaux et un analyseur logique. Les pilotes de ces appareils sont disponibles sous Windows, Linux et Mac. Le logiciel Waveforms permet d'accéder à toutes les fonctions des Analog Discovery et comporte de nombreuses fonctions de traitement des données, en particulier un analyseur de réseau et un analyseur d'impédance.

Le logiciel est livré avec un SDK qui permet d'accéder aux fonctions du pilote en C ou en Python. Ces fonctions, accessibles dans la library (DLL sous Windows), sont documentées en détail dans le fichier WaveForms SDK Reference Manual.pdf. Des exemples d'utilisation sont aussi donnés en Python. Il s'agit de fonctions de bas niveau dont l'utilisation régulière s'avère difficile d'autant plus que l'accès par ctypes n'est pas aisé. Nous proposons dans ce document des classes d'interface qui permettent d'accéder aux Analog Discovery depuis un script Python avec les types de données de Python et des tableaux numpy.ndarray. Les fonctions membres de ces classes permettent d'accéder rapidement et simplement aux différentes fonctions de ces appareils.

Nous utilisons l'Analog Discovery 2, le plus petit de ces appareils mais le plus intéressant pour son rapport qualité/prix. Il comporte (entre autre) un oscilloscope deux voies différentielles 14 bits et un générateur de signaux 2 voies. Les classes Python présentées dans ce document sont mises au point avec l'Analog Discovery 2 mais devraient en principe fonctionner aussi sur les autres modèles.

L'objectif est de dévevelopper une interface similaire (mais non identique) à celle développée pour la carte Sysam SP5 (CAN Eurosmart : interface pour Python).

Avertissement : le code Python présenté dans ce document est en cours de développement et il est donc susceptible d'évolution et de corrections de bugs. Le nom du fichier Python comporte un numéro de version.

2. Connexion à l'Analog Discovery

L'accès aux fonctions de la Library se fait au moyen du module ctypes. Le fichier dwfconstants.py est livré dans le SDK : il comporte des constantes nécessaires pour la configuration.

analog_0_1.py
from ctypes import *
from dwfconstants import *
import sys
import time
import numpy as np
			 

La classe Device établit la connexion avec un appareil Analog Device branché sur un port USB. Il devrait être possible d'utiliser plusieurs appareils simultanément.

class Error(Exception):
    pass
			 
class DeviceNotFound(Error):
    def __init__(self):
        self.message = "Device not found"

class Device:
    def __init__(self,index=0):
		# index : numéro du périphérique ou -1
        if sys.platform.startswith("win"):
            self.dwf = cdll.LoadLibrary("dwf.dll")
        elif sys.platform.startswith("darwin"):
            self.dwf = cdll.LoadLibrary("/Library/Frameworks/dwf.framework/dwf")
        else:
            self.dwf = cdll.LoadLibrary("libdwf.so")
        version = create_string_buffer(16)
        self.dwf.FDwfGetVersion(version)
        print("DWF Version: "+str(version.value))
        self.hdwf = c_int()
        self.index = index
    def open(self):
        self.dwf.FDwfDeviceOpen(c_int(self.index), byref(self.hdwf))
        if self.hdwf.value == hdwfNone.value:
            szerr = create_string_buffer(512)
            self.dwf.FDwfGetLastErrorMsg(szerr)
            print(szerr.value)
            raise DeviceNotFound()
        cBufSize = c_int()
        self.dwf.FDwfAnalogInBufferSizeInfo(self.hdwf, 0, byref(cBufSize))
        self.InMaxBufferSize = cBufSize.value
        print("Max Input Buffer Size = %d"%self.InMaxBufferSize)
    def close(self):
        self.dwf.FDwfDeviceClose(byref(self.hdwf))
			 

Pour établir une connexion avec le seul Analog Discovery branché :

device = Device(-1)
device.open()
device.close()	 
			 

3. Convertisseur A/N

Le convertisseur A/N (c'est-à-dire la fonction oscilloscope de l'Analog Discovery) est piloté avec la classe AnalogInput.

Le constructeur prend en argument une instance de la classe Device, représentant le périphérique utilisé.

class AnalogInput:
    def __init__(self,device):
        self.dev = device
        self.dwf = device.dwf
        self.hdwf = device.hdwf
        self.triggerType = {'edge':0,'pulse':1,'transition':2,'window':3}
        self.triggerCondition = {'rise':0,'fall':1,'either':3}
        self.size = 0
        self.duration = 0.0
			

La fonction channels définit les voies que l'on veut utiliser. ch est une liste contenant les numéros des voies. rg est une liste contenant les calibres des voies, c'est-à-dire la tension maximale pour chaque voie.

    def channels(self,ch,rg):
        self.ch = ch
        self.rg = rg
        self.numChannels = len(ch)
        for i in range(self.numChannels):
            self.dwf.FDwfAnalogInChannelEnableSet(self.hdwf, c_int(self.ch[i]), c_bool(True))
            self.dwf.FDwfAnalogInChannelRangeSet(self.hdwf, c_int(self.ch[i]), c_double(self.rg[i]))		 
			 

La fonction sampling définit la fréquence d'échantillonnage (freq) et le nombre d'échantillons (size). Elle renvoie le tableau des instants.

    def sampling(self,freq,size):
        self.size = size
        self.freq = freq
        self.duration = size/freq
        self.time = np.arange(size)*1/freq
        if self.numChannels==1:
            self.samples = [(c_double*size)()]
        elif self.numChannels==2:
            self.samples = [(c_double*size)(),(c_double*size)()]
        elif self.numChannels==3:
            self.samples = [(c_double*size)(),(c_double*size)(),(c_double*size)()]
        elif self.numChannels==4:
            self.samples = [(c_double*size)(),(c_double*size)(),(c_double*size)(),(c_double*size)()]
        self.dwf.FDwfAnalogInFrequencySet(self.hdwf, c_double(freq))
        if size <= self.dev.InMaxBufferSize:
            self.dwf.FDwfAnalogInBufferSizeSet(self.hdwf, c_int(size))
        return self.time	
			

Pour l'Analog Discovery 2, la fréquence d'échantillonnage maximale est 100 MS/s.

Il existe plusieurs modes d'acquisition. Le mode Single effectue une acquisition avec remplissage du tampon. Le nombre d'échantillons maximal dans ce mode est la taille du tampon, soit 8192 (213) pour l'Analog Discovery 2. Ce mode est donc assez limité en nombre de points mais il permet d'enregistrer à la fréquence d'échantillonnage la plus grande, soit 100 MHz pour l'Analog Discovery 2. La fonction acquire effectue une acquisition en mode Single et renvoie un tableau contenant les échantillons (sous forme de tension en volt).

    def acquire(self):
        self.dwf.FDwfAnalogInAcquisitionModeSet(self.hdwf,acqmodeSingle) 
        self.dwf.FDwfAnalogInConfigure(self.hdwf, c_int(0), c_int(1))
        sts = c_byte()
        while True:
            self.dwf.FDwfAnalogInStatus(self.hdwf, c_int(1), byref(sts))
            if sts.value == DwfStateDone.value :
                break
        for i in range(self.numChannels):
            self.dwf.FDwfAnalogInStatusData(self.hdwf,c_int(self.ch[i]),self.samples[i],self.size)
        self.voltage = np.zeros((self.numChannels,self.size),float)
        for i in range(self.numChannels):
            self.voltage[i,:] = np.fromiter(self.samples[i],float)
        return self.voltage		  
			  

Cette fonction est bloquante : la boucle while True met en attente de la fin de l'acquisition.

Voici comment programmer sur les deux voies une acquisition à 100 kHz avec le calibre +/- 10 V :

analog = AnalogInput(device)
analog.channels([0,1],[10,10])
fSamp = 100000
size = 8192
time = analog.sampling(fSamp,size)
voltage = analog.record()
u0 = voltage[0,:]
u1 = voltage[1,:]
			  

Le mode d'acquisition Record permet de faire une acquisition pour une durée déterminée, qui peut être beaucoup plus grande que la capacité du tampon. Le nombre d'échantillons n'est pas limité mais, si la fréquence d'échantillonnage est trop grande, il y a risque de perdre certains échantillons. La fréquence d'échantillonnage maximale (sans perte de données) dans ce mode est d'environ 1 MHz. La fonction record lance une acquisition en mode Record et récupère la totalité des échantillons, qu'elle renvoie sous la forme d'un tableau (comme la fonction acquire). Cette fonction est bloquante jusqu'à la fin de l'acquisition.

    def record(self):
        if self.size <= self.dev.InMaxBufferSize:
            return self.acquire()
        cAvailable = c_int()
        cLost = c_int()
        cCorrupted = c_int()
        fLost = 0
        fCorrupted = 0
        # acquisition d'une durée déterminée
        self.dwf.FDwfAnalogInAcquisitionModeSet(self.hdwf,acqmodeRecord)
        self.dwf.FDwfAnalogInRecordLengthSet(self.hdwf, c_double(self.duration))
        self.dwf.FDwfAnalogInConfigure(self.hdwf, c_int(0), c_int(1))
        sts = c_byte()
        self.cSamples = 0 # nombre d'échantillons acquis
        nLost = 0
        nCorrupt = 0
        while True:
            self.dwf.FDwfAnalogInStatus(self.hdwf, c_int(1), byref(sts))
            if self.cSamples == 0 and (sts.value == DwfStateConfig.value or sts.value == DwfStatePrefill.value or sts.value == DwfStateArmed.value) :
                continue
            
            self.dwf.FDwfAnalogInStatusRecord(self.hdwf, byref(cAvailable), byref(cLost), byref(cCorrupted))
            self.cSamples += cLost.value
            if cLost.value :
                fLost = 1
                nLost += cLost.value
            if cCorrupted.value :
                fCorrupted = 1
                nCorrupt += cCorrupted.value
            if cAvailable.value==0 :
                continue
            
            if self.cSamples+cAvailable.value > self.size :
                cAvailable = c_int(self.size-self.cSamples)
            for i in range(self.numChannels):
                self.dwf.FDwfAnalogInStatusData(self.hdwf,c_int(self.ch[i]),byref(self.samples[i],sizeof(c_double)*self.cSamples),cAvailable)
            self.cSamples += cAvailable.value
            if sts.value == DwfStateDone.value: 
                break
        
        self.voltage = np.zeros((self.numChannels,self.size),float)
        for i in range(self.numChannels):
            self.voltage[i,:] = np.fromiter(self.samples[i],float)
        if fLost:
            print("%d samples were lost! Reduce frequency"%nLost)
        if fCorrupted:
            print("%d samples could be corrupted! Reduce frequency"%nCorrupt)
        return self.voltage		
				

Remarque : lorsque la fonction record est appelée avec un nombre d'échantillons programmé inférieur à la taille du tampon, c'est en fait la fonction acquire qui est utilisée pour faire l'acquisition. En conséquence, on pourra utiliser dans tous les cas la fonction record et programmer un nombre d'échantillons inférieur à la taille du tampon si l'on souhaite travailler à très grande fréquence d'échantillonnage.

Lorsque la durée et longue (plus de 1 seconde), il peut être nécessaire de traiter les données en parallèle à l'acquisition (mode Record asynchrone). Les fonctions startRecord et getAvailable permettent de faire cela. La fonction startRecord démarre l'acquisition en mode Record et retourne immédiatement.

    def startRecord(self):
        self.dwf.FDwfAnalogInAcquisitionModeSet(self.hdwf,acqmodeRecord)
        self.dwf.FDwfAnalogInRecordLengthSet(self.hdwf, c_double(self.duration))
        self.dwf.FDwfAnalogInConfigure(self.hdwf, c_int(0), c_int(1))
        self.cSamples = 0 # nombre d'échantillons acquis		 
				 

La fonction getAvailable renvoie les échantillons qui sont déjà acquis depuis le début de l'acquisition. Elle renvoie le nombre d'échantillons nouveaux, le nombre total d'échantillons (depuis le début de l'acquisition) et le tableau contenant les tensions de tous les échantillons depuis le début. C'est une fonction non bloquante : si aucun nouvel échantillon n'est disponible, elle retourne immédiatement en renvoyant la valeur 0 comme nombre d'échantillons nouveaux.

    def getAvailable(self):
        cAvailable = c_int()
        cLost = c_int()
        cCorrupted = c_int()
        fLost = 0
        sts = c_byte()
        fCorrupted = 0
        if self.cSamples >= self.size:
            return 0,self.cSamples,[]
        self.dwf.FDwfAnalogInStatus(self.hdwf, c_int(1), byref(sts))
        
        if self.cSamples == 0 and (sts == DwfStateConfig or sts == DwfStatePrefill or sts == DwfStateArmed) :
            return 0,self.cSamples,[]
        self.dwf.FDwfAnalogInStatusRecord(self.hdwf, byref(cAvailable), byref(cLost), byref(cCorrupted))
        self.cSamples += cLost.value
        if cLost.value :
            fLost = 1
        if cCorrupted.value :
            fCorrupted = 1
        if cAvailable.value==0 :
            return 0,self.cSamples,[]
        if self.cSamples+cAvailable.value > self.size :
            cAvailable = c_int(self.size-self.cSamples)
        for i in range(self.numChannels):
            self.dwf.FDwfAnalogInStatusData(self.hdwf,c_int(self.ch[i]),byref(self.samples[i],sizeof(c_double)*self.cSamples),cAvailable)
        self.cSamples += cAvailable.value
        self.voltage = np.zeros((self.numChannels,self.size),float)
        for i in range(self.numChannels):
            self.voltage[i,:] = np.fromiter(self.samples[i],float)
        if fLost:
            print("%d samples were lost! Reduce frequency"%cLost.value)
        if fCorrupted:
            print("%d samples could be corrupted! Reduce frequency"%cCorrupted.value)
        return cAvailable.value,self.cSamples,self.voltage			  
				  

Si l'on veut obtenir un tableau contenant seulement le dernier paquet, il faut procéder de la manière suivante :

numAvailable,end,voltage = analog.getAvailable()
paquet = voltage[0,end-numAvailable:end] # dernier paquet pour la voie 0  
				  

Le mode d'acquisition Scan Shift permet d'utiliser le tampon interne de l'Analog Discovery (de taille maximale 8192) comme tampon FIFO. Dans ce cas, le tampon contient à tout instant les derniers échantillons numérisés.

La fonction startScanShift déclenche l'acquisition en mode Scan Shift. Il faut noter que le déclenchement matériel est inopérant dans ce mode.

    def startScanShift(self):
        self.dwf.FDwfAnalogInAcquisitionModeSet(self.hdwf,acqmodeScanShift)
        self.dwf.FDwfAnalogInBufferSizeSet(self.hdwf,c_int(self.size))
        self.dwf.FDwfAnalogInConfigure(self.hdwf, c_int(0), c_int(1))
				   

La taille du tampon est la taille (size) définie lors de l'appel de la fonction sampling. Elle doit être inférieure à la taille maximale du tampon (8192 sur Analog Discovery 2). Si une taille supérieure est choisi, c'est la taille maximale qui est retenue.

La fonction getBuffer permet de récupérer le contenu du tampon pour les voies configurées. Elle renvoie le nombre d'échantillons (qui est inférieur à la taille du tampon au début de l'acquisition) et le tableau contenant les tensions.

    def getBuffer(self):
        sts = c_byte()
        self.dwf.FDwfAnalogInStatus(self.hdwf, c_int(1), byref(sts))
        cValid = c_int(0)
        self.dwf.FDwfAnalogInStatusSamplesValid(self.hdwf, byref(cValid))
        for i in range(self.numChannels):
            self.dwf.FDwfAnalogInStatusData(self.hdwf,c_int(self.ch[i]),byref(self.samples[i]),cValid)
        self.voltage = np.zeros((self.numChannels,self.size),float)
        for i in range(self.numChannels):
            self.voltage[i,:] = np.fromiter(self.samples[i],float)
        return cValid.value,self.voltage				 
					 

Par défaut, l'acquisition démarre immédiatement. La fonction analogTrigger permet de configurer un déclenchement à partir d'un front sur une des entrées analogiques (voie 0 ou 1 pour l'Analog Discovery 2) :

    def analogTrigger(self,ch,level=0.0,hysteresis=0.01,type='edge',condition='rise',position=0.0,timeout=10):
        if ch=='none':
            self.dwf.FDwfAnalogInTriggerSourceSet(self.hdwf,trigsrcNone)
            return
        self.dwf.FDwfAnalogInTriggerAutoTimeoutSet(self.hdwf, c_double(timeout))
        self.dwf.FDwfAnalogInTriggerSourceSet(self.hdwf, trigsrcDetectorAnalogIn)
        self.dwf.FDwfAnalogInTriggerTypeSet(self.hdwf, c_int(self.triggerType[type]))
        self.dwf.FDwfAnalogInTriggerChannelSet(self.hdwf,c_int(ch))
        self.dwf.FDwfAnalogInTriggerLevelSet(self.hdwf, c_double(level))
        self.dwf.FDwfAnalogInTriggerHysteresisSet(self.hdwf, c_double(hysteresis))
        self.dwf.FDwfAnalogInTriggerConditionSet(self.hdwf, c_int(self.triggerCondition[condition]))
        self.dwf.FDwfAnalogInTriggerPositionSet(self.hdwf, c_double(position))			   
				   

L'analog Discovery 2 possède deux entrées Trigger (T1 et T2) permettant de déclencher l'acquisition à partir d'un signal externe binaire. La fonction externalTrigger configure le déclenchement sur une de ces deux entrées :

    def externalTrigger(self,extTriCh,level=0.0,hysteresis=0.01,type='edge',condition='rise',position=0.0,timeout=10):
        if extTriCh==1:
            src = trigsrcExternal1
        elif extTriCh==2:
            src = trigsrcExternal2
        else:
            return
        self.dwf.FDwfAnalogInTriggerAutoTimeoutSet(self.hdwf, c_double(timeout))
        self.dwf.FDwfAnalogInTriggerSourceSet(self.hdwf, src)
        self.dwf.FDwfAnalogInTriggerTypeSet(self.hdwf, c_int(self.triggerType[type]))
        self.dwf.FDwfAnalogInTriggerLevelSet(self.hdwf, c_double(level))
        self.dwf.FDwfAnalogInTriggerHysteresisSet(self.hdwf, c_double(hysteresis))
        self.dwf.FDwfAnalogInTriggerConditionSet(self.hdwf, c_int(self.triggerCondition[condition]))
        self.dwf.FDwfAnalogInTriggerPositionSet(self.hdwf, c_double(position))				
					

4. Générateur de signaux

L'Analog Discovery 2 possède deux générateurs de signaux DDS 14 bits de fréquence d'échantillonnage maximale 100 MS/s, de tension maximale +/- 5 V et de courant maximal +/- 100 mA. Ces générateurs de signaux offre des possibilités de programmation très supérieures à un générateur DDS de laboratoire autonome bien que sa gamme de tensions et de courants de sortie soit deux fois plus basse.

La classe AnalogOutput permet de programmer ces générateurs.

class AnalogOutput:
    def __init__(self,device):
        self.dev = device
        self.dwf = device.dwf
        self.hdwf = device.hdwf
        self.functions = {'DC':0,'sine':1,'square':2,'triangle':3,'rampUp':4,'rampDown':5,'noise':6}
        self.node = {'carrier':c_long(0),'FM':c_long(1),'AM':c_long(2)}
        self.size = [0,0]
        self.iPlay = [0,0]
        self.playStarted = [False,False]
        self.buffer = [0,0]
			

La configuration d'un générateur se fait en définissant trois nœuds : la porteuse (carrier) et éventuellement la modulation d'amplitude (AM) et/ou la modulation de fréquence (FM). Chacun de ces 3 nœuds a son propre tampon.

La fonction function permet de définir un signal d'une forme prédéfinie (sine, square, triangle, rampUp, rampDown ou noise). On précise la fréquence, l'amplitude et l'offset, comme on le ferait sur un générateur de signaux de laboratoire.

    def function(self,ch,func,freq,amp,offset=0,phase=0,symmetry=50,node='carrier'):
        analogOutNode = self.node[node]
        self.dwf.FDwfAnalogOutNodeEnableSet(self.hdwf, c_int(ch), analogOutNode, c_bool(True))
        self.dwf.FDwfAnalogOutNodeFunctionSet(self.hdwf, c_int(ch), analogOutNode,c_ubyte(self.functions[func]))
        self.dwf.FDwfAnalogOutNodeFrequencySet(self.hdwf, c_int(ch), analogOutNode, c_double(freq))
        self.dwf.FDwfAnalogOutNodeAmplitudeSet(self.hdwf, c_int(ch), analogOutNode, c_double(amp))
        self.dwf.FDwfAnalogOutNodeOffsetSet(self.hdwf, c_int(ch), analogOutNode, c_double(offset))
        self.dwf.FDwfAnalogOutNodePhaseSet(self.hdwf, c_int(ch), analogOutNode, c_double(phase))
        self.dwf.FDwfAnalogOutNodeSymmetrySet(self.hdwf, c_int(ch), analogOutNode, c_double(symmetry))		  
			  

Cette fonction ne fait que configurer la sortie, elle ne la déclenche pas.

La fonction waveform permet de définir une forme d'onde à partir d'un tableau.

    def waveform(self,ch,samples,freq,amp,periods=0,offset=0,phase=0,symmetry=50,node='carrier',repeat=0):
        analogOutNode = self.node[node]
        self.dwf.FDwfAnalogOutNodeEnableSet(self.hdwf, c_int(ch), analogOutNode, c_bool(True))
        self.dwf.FDwfAnalogOutNodeFunctionSet(self.hdwf, c_int(ch), analogOutNode,funcCustom)
        self.dwf.FDwfAnalogOutNodeFrequencySet(self.hdwf, c_int(ch), analogOutNode, c_double(freq))
        self.dwf.FDwfAnalogOutNodeAmplitudeSet(self.hdwf, c_int(ch), analogOutNode, c_double(amp))
        self.dwf.FDwfAnalogOutNodeOffsetSet(self.hdwf, c_int(ch), analogOutNode, c_double(offset))
        self.dwf.FDwfAnalogOutNodePhaseSet(self.hdwf, c_int(ch), analogOutNode, c_double(phase))
        self.dwf.FDwfAnalogOutNodeSymmetrySet(self.hdwf, c_int(ch), analogOutNode, c_double(symmetry))
        Ns = len(samples)
        rgdSamples = (c_double*Ns)()
        for i in range(Ns):
            rgdSamples[i] = samples[i]
        self.dwf.FDwfAnalogOutNodeDataSet(self.hdwf, c_int(ch), analogOutNode,rgdSamples,c_int(Ns))
        self.dwf.FDwfAnalogOutRepeatSet(self.hdwf, c_int(ch),repeat)
        if periods!=0:
            self.dwf.FDwfAnalogOutRunSet(self.hdwf,c_int(ch),c_double(periods/freq))
			  

Remarque : pour obtenir un signal permanent sans durée définie, on pourra choisi periods=1 et repeat=0.

La fonction start déclenche une voie :

    def start(self,ch):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(ch), c_int(1))		
			  

La fonction stop stoppe une voie :

    def stop(self,ch):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(ch), c_int(0))
				

Il est possible de déclencher les deux voies et de les stopper avec les fonctions startAll et stopAll :

    def startAll(self):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(-1), c_int(1))
    def stopAll(self):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(-1), c_int(0))		  
			  

Lorsque les deux sorties sont déclenchées par startAll, elles ne sont pas synchrones. Pour obtenir deux sorties synchrones, il faut qu'une des deux soit déclarée master, l'autre slave. Le déclenchement de la sortie master déclenche simultanément celui de la sortie slave. La fonction setMasterpermet de configurer cette dépendance :

    def setMaster(self,ch,chMaster):
        self.dwf.FDwfAnalogOutMasterSet(self.hdwf,c_int(ch),c_int(chMaster))		
				

Lorsqu'on modifie la configuration d'une voie en cours de fonctionnement (avec function ou waveform), les changements ne prennent pas effet immédiatement. Il faut pour cela soit stopper la sortie puis la relancer, soit plus simplement utiliser la fonction apply ou bien applyAll :

    def apply(self,ch):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(ch), c_int(2))
    def applyAll(self,ch):
        self.dwf.FDwfAnalogOutConfigure(self.hdwf, c_int(-1), c_int(2))
				 

Il est possible de déclencher une sortie à partir du front montant sur une entrée Trigger (T1 ou T2) :

    def externalTrigger(self,ch,extTriCh):
        if extTriCh==1:
            src = trigsrcExternal1
        elif extTriCh==2:
            src = trigsrcExternal2
        else:
            return
        self.dwf.FDwfAnalogOutTriggerSourceSet(self.hdwf, c_int(ch), src)		 
				 

Il est possible d'envoyer des données sur un nœud en flux continu, à une fréquence d'échantillonnage déterminée. On peut par exemple lire un fichier audio et envoyer les échantillons vers une sortie à 44 kHz. La fonction configPlay configure ce mode de fonctionnement :

    def configPlay(self,ch,freq,amp,size,offset=0,node='carrier'):
        analogOutNode = self.node[node]
        self.size[ch] = size
        self.iPlay[ch] = 0
        self.dwf.FDwfAnalogOutNodeEnableSet(self.hdwf, ch, analogOutNode, c_bool(True))
        self.dwf.FDwfAnalogOutNodeFunctionSet(self.hdwf, ch, 0, funcPlay)
        self.dwf.FDwfAnalogOutRepeatSet(self.hdwf, ch, c_int(1))
        self.dwf.FDwfAnalogOutRunSet(self.hdwf, ch, c_double(size/freq))
        self.dwf.FDwfAnalogOutNodeFrequencySet(self.hdwf, ch, analogOutNode, c_double(freq))
        self.dwf.FDwfAnalogOutNodeAmplitudeSet(self.hdwf, ch, analogOutNode, c_double(amp))
        self.dwf.FDwfAnalogOutNodeOffsetSet(self.hdwf, c_int(ch), analogOutNode, c_double(offset))
        cBuffer = c_int(0)
        self.dwf.FDwfAnalogOutNodeDataInfo(self.hdwf, ch, analogOutNode, 0, byref(cBuffer))
        self.bufferSize = cBuffer.value
        self. playStarted[ch] = False
        self.buffer[ch] = np.zeros(0)			  
				  

La fonction playData envoie un tableau d'échantillons vers un nœud, qui doit être configuré au préabalable avec la fonction configPlay.

    def playData(self,ch,data,node='carrier',waitTrigger=False):
        analogOutNode = self.node[node]
        dataLost = c_int(0)
        dataFree = c_int(0)
        dataCorrupted = c_int(0)
        sts = c_ubyte(0)
        totalLost = 0
        totalCorrupted = 0
        dataNumSamp = data.size
        data_c = (c_double*data.size)(*data)
        iData = 0
        
        if not(self.playStarted[ch]):
            availableBuffer = self.bufferSize - self.iPlay[ch] # place restante dans le buffer
            if availableBuffer > 0:
                if dataNumSamp > availableBuffer:
                    self.buffer[ch] = np.append(self.buffer[ch],data[0:availableBuffer])
                    self.iPlay[ch] += availableBuffer
                    iData  = availableBuffer
                    dataNumSamp -= availableBuffer
                    availableBuffer = 0
                else:
                    self.buffer[ch] = np.append(self.buffer[ch],data)
                    self.iPlay[ch] += dataNumSamp
                    return # le buffer n'est pas plein
            if availableBuffer == 0: # le buffer est plein : on peut remplir le buffer de l'AD et démarrer
                self.dwf.FDwfAnalogOutNodeDataSet(self.hdwf, ch, analogOutNode, (c_double*self.bufferSize)(*self.buffer[ch]), c_int(self.bufferSize))
                self.dwf.FDwfAnalogOutConfigure(self.hdwf, ch, c_bool(True))
                self.playStarted[ch]=True
        
        while dataNumSamp > 0:
            if self.dwf.FDwfAnalogOutStatus(self.hdwf, ch, byref(sts)) != 1:
                print("Error")
                szerr = create_string_buffer(512)
                self.dwf.FDwfGetLastErrorMsg(szerr)
                print(szerr.value)
                return 0
            if waitTrigger:
                if sts.value == 1: # state = armed
                    while sts.value != 3: # attente du déclenchement
                        self.dwf.FDwfAnalogOutStatus(self.hdwf, ch, byref(sts))
            if sts.value != 3:
                print('not running',dataNumSamp)
                return 0 # not running !DwfStateRunning
            if self.iPlay[ch] >= self.size[ch] :
                print("oversized data")
                return 0 
            self.dwf.FDwfAnalogOutNodePlayStatus(self.hdwf, ch, analogOutNode, byref(dataFree), byref(dataLost), byref(dataCorrupted))
            totalLost += dataLost.value
            totalCorrupted += dataCorrupted.value
            if dataFree.value > dataNumSamp:
                dataFree.value = dataNumSamp
            if dataFree.value > 0 :
                if self.dwf.FDwfAnalogOutNodePlayData(self.hdwf, ch, analogOutNode,byref(data_c,iData*8), dataFree) != 1:
                    print("Error")
                    return 0
                if dataLost.value!=0 : print("lost : %d"%dataLost.value)
                self.iPlay[ch] += dataFree.value
                dataNumSamp -= dataFree.value
                iData += dataFree.value			  
				  

Le nombre total d'échantillons étant size, il est possible d'envoyer successivement P paquets de taille size/P chacun. Il faut évidemment les envoyer à un rythme convenable, c'est-à-dire qu'un paquet doit être envoyé avant que l'émission en sortie du paquet précédent soit terminé.

5. Exemples

5.a. Génération d'un signal périodique

Le script suivant permet de générer un signal périodique de forme standard sur la sortie W1.

generationSignal.py
import numpy as np
from analog_0_1 import Device,AnalogInput,AnalogOutput
device = Device(-1)
device.open()
output = AnalogOutput(device)
freq = 10e3
amp = 2.0
output.function(0,'sine',freq,amp)
output.start(0)
r = input("Stopper ?")
output.stop(0)
device.close()
				

Le script suivant permet de générer un signal périodique défini par une forme d'onde arbitraire :

generationFormeOnde.py
import numpy as np
import matplotlib.pyplot as plt

from analog_0_1 import Device,AnalogInput,AnalogOutput
device = Device(-1)
device.open()
output = AnalogOutput(device)
freq = 10e3
amp = 1.0
def signal(theta):
    return 5*np.sin(theta)+1*np.cos(3*theta)
Ne = 1000
theta = np.arange(Ne)*2*np.pi/Ne
samples = signal(theta)
samples /= samples.max()
periods = 1 
repeat = 0
output.waveform(0,samples,freq,amp,periods=periods,repeat=repeat)
output.start(0)
plt.figure()
plt.plot(samples)
plt.show()
output.stop(0)
device.close()			
				

Le script suivant permet de générer deux signaux périodiques de forme standard synchrones :

generationDeuxSignaux.py
import numpy as np
from analog_0_1 import Device,AnalogInput,AnalogOutput
device = Device(-1)
device.open()
output = AnalogOutput(device)
freq = 10e3
amp = 2.0
output.function(0,'sine',freq,amp,phase=0)
output.function(1,'sine',freq,amp,phase=90) # signaux déphasés de 90 °
output.setMaster(1,0)
output.start(0)
r = input("Stopper ?")
output.stop(0)
device.close()	
				

5.b. Génération d'une porteuse modulée

Le script suivant génère une porteuse modulée en amplitude ou en fréquence :

generationModulationAM.py
import numpy as np
from analog_0_1 import Device,AnalogInput,AnalogOutput
device = Device(-1)
device.open()
output = AnalogOutput(device)
freq = 10e3 # fréquence de la porteuse
amp = 2.0
output.function(0,'sine',freq,amp)
am_freq = 100 # fréquence de modulation
am_amp = 50 # indice de modulation en %
output.function(0,'sine',am_freq,am_amp,node='AM')
output.start(0)
r = input("Stopper ?")
output.stop(0)
device.close()
				

Pour obtenir une modulation de fréquence, il suffit d'écrire node='FM' dans l'appel de output.function.

5.c. Numérisation d'un signal

Le script suivant effectue la numérisation d'un signal sur la voie 0 (CH1) et son analyse spectrale. Le signal est généré par le générateur de l'AD (sortie W1). Il faut donc relier la sortie W1 à l'entrée CH1.

generationNumerisation.py
import numpy as np
import matplotlib.pyplot as plt
from analog_0_1 import Device,AnalogInput,AnalogOutput
from numpy.fft import fft
from scipy.signal import blackman

def analyseSpectrale(t,x,nz=5):
	# nz : nombre de blocs de zéros ajoutés
    te = t[1]-t[0]
    N=len(x)
    p = int(np.log(nz*N)/np.log(2))
    N1 = 2**p
    x1=np.concatenate((x*blackman(N),np.zeros(N1-N)))
    spectre = np.absolute(fft(x1))*2.0/N/0.42
    N1 = len(x1)
    T1 = N1*te
    freq=np.arange(N1)*1/T1
    plt.figure()
    plt.plot(freq,spectre)
    plt.xlabel('f (Hz)')
    plt.grid()
   

device = Device(-1)
device.open()
output = AnalogOutput(device)
freq = 10e3
amp = 2
output.function(0,'triangle',freq,amp,offset=0.0,phase=0)
output.start(0)
analog = AnalogInput(device)
analog.channels([0],[5])
fSamp = freq*100
size = 8192
time = analog.sampling(fSamp,size)
voltage = analog.record()
u0 = voltage[0,:]
analyseSpectrale(time,u0,nz=10)
plt.figure()
plt.plot(time,u0,label='ch 1')
plt.xlabel('t (s)')
plt.ylabel('U (V)')
plt.grid()
plt.legend(loc='upper right')
plt.show()
device.close()
				 

5.d. Numérisation en mode asynchrone

Le script suivant effectue une numérisation sur la voie CH1 en mode Record asynchrone. Il comporte une animation qui lit les échantillons à intervalle de temps régulier et complète le tracé du signal au fur et à mesure.

numerisationRecordAsynchrone.py
import numpy as np
import matplotlib.pyplot as plt
from analog_0_1 import Device,AnalogInput
from matplotlib.animation import FuncAnimation

device = Device(-1)
device.open()
analog = AnalogInput(device)
Umax = 5
analog.channels([0,1],[Umax,Umax])
fSamp = 1e3
duree = 60
size = int(fSamp*duree)
time = analog.sampling(fSamp,size)

fig,ax = plt.subplots()
line0, = ax.plot(time,np.zeros(len(time)))
ax.grid()
ax.set_xlabel("t (s)")
ax.set_ylabel("u0 (V)")
ax.axis([0,duree,-Umax,Umax])
signal = np.zeros((0))
analog.startRecord()

def animate(i):
    global analog,line0,time,signal,zi
    numAvailable,end,voltage = analog.getAvailable()
    if numAvailable > 0:
        paquet = voltage[0,end-numAvailable:end]
        signal = np.append(signal,paquet)
        line0.set_xdata(time[0:end])
        line0.set_ydata(signal)
interval = 200 # intervalle de temps de lecture des données en ms
ani = FuncAnimation(fig,animate,1,interval=interval,repeat=True)
plt.show()
device.close()
				 

5.e. Oscilloscope

Le script suivant effectue une numérisation de 8192 (taille maximale du tampon) sur les deux voies CH1 et CH2 et affiche en temps réel les deux signaux. Le déclenchement se fait sur la voie 0.

oscilloscope.py
import numpy as np
import matplotlib.pyplot as plt
from analog_0_1 import Device,AnalogInput
from matplotlib.animation import FuncAnimation

device = Device(-1)
device.open()
analog = AnalogInput(device)
Umax = 5
analog.channels([0,1],[Umax,Umax])
fSamp = 50e3
size = 8192
T = size/fSamp
t = analog.sampling(fSamp,size)	
analog.analogTrigger(0,level=0.0,type='edge',condition='rise')
voltage = analog.record()

fig,ax = plt.subplots()
line0, = ax.plot(t,voltage[0,:])
line1, = ax.plot(t,voltage[1,:])
ax.grid()
ax.set_xlabel("t (s)")
ax.set_ylabel("u0 (V)")
ax.axis([0,T,-Umax,Umax])

def animate(i):
    global analog,line0
    voltage = analog.record()
    line0.set_ydata(voltage[0,:])
    line1.set_ydata(voltage[1,:])

interval = 100 # intervalle de temps de rafraichissement des données en ms
ani = FuncAnimation(fig,animate,1,interval=interval,repeat=True)
plt.show()
device.close()
				

Contrairement à ce qui est fait dans l'exemple précédent, cet exemple ne fait pas d'enregistrement des échantillons sur une durée définie : il se contente d'afficher une fenêtre temporelle avec un certain taux de rafraichissement.

5.f. Acquisition en mode Scan Shift

Le script ci-dessous effectue une acquisition en mode Scan Shift. Le contenu du tampon est lu à un intervalle de temps arbitraire et les courbes sont tracées.

ocquisitionScanShif.py
import numpy as np
import matplotlib.pyplot as plt
from analog import Device,AnalogInput
from matplotlib.animation import FuncAnimation


device = Device(-1)
device.open()
analog = AnalogInput(device)
Umax = 5
analog.channels([0,1],[Umax,Umax])
fSamp = 1e3
size = 10000
duree = size/fSamp
print("duree = %f"%duree)
t = analog.sampling(fSamp,size)
analog.startScanShift()

fig,ax = plt.subplots()
line0, = ax.plot(t,np.zeros(len(t)))
line1, = ax.plot(t,np.zeros(len(t)))
ax.grid()
ax.set_xlabel("t (s)")
ax.set_ylabel("u (V)")
ax.axis([0,duree,-Umax,Umax])

def animate(i):
    global analog,line0,t
    valid,voltage = analog.getBuffer()
    if valid > 0:
        line0.set_xdata(t[0:valid])
        line0.set_ydata(voltage[0,0:valid])
        line1.set_xdata(t[0:valid])
        line1.set_ydata(voltage[1,0:valid])

interval = 100
ani = FuncAnimation(fig,animate,1,interval=interval,repeat=True)
plt.show()
device.close()			
				

Ce mode n'apporte rien par rapport au mode Record mais il facilite le tracé des derniers échantillons numérisés. Si l'on souhaite stocker les signaux (et pas seulement les tracer), il faut utiliser le mode Record.

Creative Commons LicenseTextes et figures sont mis à disposition sous contrat Creative Commons.