Skip to content

Manual Processing tools

reformat_event_labels(subject, session, task, data_directory, annotations_directory, extension=None)

This script takes the events files, reads the timestamps in, and organizes them suitably for the data_viewer. Outputs the events as a csv file :param subject: (str). Subject :param session: (str). Session :param task: (srt). Task the subject completed in this session :param data_directory: (Path). The Path object that points to the neuralynx data directory :param annotations_directory: (Path). The Path object that points to where the annotations file will go. :param extension: str optional. :return:

Source code in intracranial_ephys_utils/manual_process.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def reformat_event_labels(subject, session, task, data_directory, annotations_directory, extension=None):
    """
    This script takes the events files, reads the timestamps in, and organizes them suitably for
    the data_viewer. Outputs the events as a csv file
    :param subject: (str). Subject
    :param session: (str). Session
    :param task: (srt). Task the subject completed in this session
    :param data_directory: (Path). The Path object that points to the neuralynx data directory
    :param annotations_directory: (Path). The Path object that points to where the annotations file will go.
    :param extension: str optional.
    :return:
    """
    event_times, event_labels, global_start, event_files = get_event_times(data_directory, extension=extension)
    if len(event_times) == 0:
        source_epoch = pd.DataFrame(np.array([[], [], []]).T, columns=['time', 'duration', 'label'])
    else:
        event_times_sec = event_times*10**-6-global_start
        durations = np.ones((event_times_sec.shape[0], ))*0.5
        source_epoch = pd.DataFrame(np.array([event_times_sec, durations, event_labels]).T, columns=['time', 'duration',
                                                                                                     'label'])

    file_root, _ = os.path.splitext(event_files[0])
    annotations_file = f'{subject}_{session}_{task}_{file_root}.csv'
    if annotations_file in os.listdir(annotations_directory):
        print('Annotations File exists, so nothing was written. Double check to see if it matches expectation.')
    else:
        source_epoch.to_csv(annotations_directory / annotations_file, index=False)
    return event_files[0]

photodiode_check_viewer(subject, session, task, data_directory, annotations_directory, diagnostic=False, task_start=0.0, events_filename=None)

This script is a generalized dataviewer to look at a photodiode signal, and bring up the events. With the viewer, we can make annotations and save them to a csv file. Additionally, if the diagnostic optional parameter is set to True, this function will also preprocess the photodiode to check that the signal on TTL is good. :param subject: (string) The patient ID :param session: (string) Session of the experiment. Useful if patient completed more than one session of a task. :param task: (string) Which task :param data_directory: (Path object) Where the data lives :param annotations_directory: (Path object) Where we want to put the annotations of the data :param diagnostic: (bool) (optional) If True we will also plot diagnostics, and preprocess photodiode and overlay it. :param task_start: (float) (optional) The start time of the task :param events_filename: (str) (optional) The extension of the file (in case multiple datasets in the same folder) :return:

Source code in intracranial_ephys_utils/manual_process.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def photodiode_check_viewer(subject, session, task, data_directory, annotations_directory, diagnostic=False,
                            task_start=0., events_filename=None):
    """
    This script is a generalized dataviewer to look at a photodiode signal, and bring up the events.
    With the viewer, we can make annotations and save them to a csv file. Additionally, if the diagnostic optional
    parameter is set to True, this function will also preprocess the photodiode to check that the signal on TTL is good.
    :param subject: (string) The patient ID
    :param session: (string) Session of the experiment. Useful if patient completed more than one session of a task.
    :param task: (string) Which task
    :param data_directory: (Path object) Where the data lives
    :param annotations_directory: (Path object) Where we want to put the annotations of the data
    :param diagnostic: (bool) (optional) If True we will also plot diagnostics, and preprocess photodiode and overlay
    it.
    :param task_start: (float) (optional) The start time of the task
    :param events_filename: (str) (optional) The extension of the file (in case multiple datasets in the same folder)
    :return:
    """

    # Underlying assumption is that data is organized as subject/session/raw, this is where the .ncs files live
    all_files_list = os.listdir(data_directory)
    # electrode_files = [file_path for file_path in all_files_list if (re.match('m.*ncs', file_path) and not
    #                file_path.endswith(".nse"))]

    ext = events_filename.replace('.nev', '.ncs')
    ext = ext.replace('events', '')
    ext = ext.replace('Events', '')
    if ext is not None and ext != '.ncs':
        warnings.warn(f"Most likely multiple photodiode files, picking {ext} now")
        ph_files = [file_path for file_path in all_files_list if file_path.endswith(ext) and
                    (file_path.startswith('photo1') or file_path.startswith('Photo') or file_path.startswith('PH_Diode'))]
    else:
        ph_files = [file_path for file_path in all_files_list if file_path.endswith('.ncs') and
                    (file_path.startswith('photo1') or file_path.startswith('Photo') or file_path.startswith('PH_Diode'))]
        if len(ph_files) > 1:
            warnings.warn("Multiple photodiode files picking the base one now")
            ph_files = [file for file in ph_files if (file.endswith('photo1.ncs') or file.endswith('photo.ncs') or file.startswith('PH_Diode'))]
    assert len(ph_files) == 1
    ph_filename = ph_files[0]

    # We'll read in the photodiode signal
    ph_signal, sampling_rate, interp, timestamps = read_task_ncs(data_directory, ph_filename)

    # we'll make some sanity plots to check photodiode at a glance, and preprocess to double-check event trigger
    # is good
    if diagnostic:
        diagnostic_time_series_plot(ph_signal, sampling_rate, electrode_name='Photodiode')
        # Use the diagnostic plot to limit the time interval we process photodiode in
        start_time = input('Type start time (in seconds) if not the start of signal, else press enter: ')
        end_time = input('Type end time (in seconds) if not the end of signal, else press enter: ')
        if len(start_time) == 0:
            start_time = 0
        else:
            start_time = int(start_time)
        if len(end_time) == 0:
            end_time = int(ph_signal.shape[0])
        else:
            end_time = int(end_time)

        ph_signal = ph_signal[int(start_time*sampling_rate):int(sampling_rate * end_time)]
        # timestamps = timestamps[int(start_time*sampling_rate):int(sampling_rate * end_time)]
        t_start = start_time

        # next step to this is to add my thresholding for photodiode
        ph_signal_bin = binarize_ph(ph_signal, sampling_rate)
        dataset = np.vstack([ph_signal, ph_signal_bin]).T
        labels = np.array([ph_filename, 'Photodiode Binarized'])
    else:
        t_start = task_start
        dataset = np.expand_dims(ph_signal, axis=1)
        labels = np.expand_dims(np.array([ph_filename]), axis=1)

    app = mkQApp()

    # Create main window that can contain several viewers
    win = MainViewer(debug=True, show_auto_scale=True)

    # Create a viewer for signal
    # T_start essentially rereferences the start time of the dataset, but leaves the annotations alone
    # be wary of this
    view1 = TraceViewer.from_numpy(dataset, sampling_rate, t_start, 'Photodiode', channel_names=labels)

    # TO DO
    # Figure out a better way to scale the different signals when presented
    # view1.params['scale_mode'] = 'same_for_all'
    view1.auto_scale()
    win.add_view(view1)

    # annotation file details
    possible_labels = [f'{task} duration']
    file_root, _ = os.path.splitext(events_filename)
    file_path = annotations_directory / f'{subject}_{session}_{task}_{file_root}.csv'
    source_epoch = CsvEpochSource(file_path, possible_labels)

    # create a viewer for the encoder itself
    view2 = EpochEncoder(source=source_epoch, name='Tagging events')
    win.add_view(view2)

    # show main window and run Qapp
    win.show()
    app.exec()

photodiode_event_check_viewer(subject, session, task, dataset, labels, sampling_rate, annotations_directory)

This code will plot the ph_signal, the binarized signal and the computed event onsets and offsets with the goal of getting the types of events properly annotated. :param dataset: (np.array) :parma labels: (np.array) - label associated with dataset :param sampling_rate: (float) :param event_onsets: (np.array) :param event_offsets: (np.array) :param annotations_directory: (Path object) :return: .csv file

Source code in intracranial_ephys_utils/manual_process.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def photodiode_event_check_viewer(subject, session, task, dataset, labels, sampling_rate, annotations_directory):
    """
    This code will plot the ph_signal, the binarized signal and the computed event onsets and offsets
    with the goal of getting the types of events properly annotated.
    :param dataset: (np.array)
    :parma labels: (np.array) - label associated with dataset
    :param sampling_rate: (float)
    :param event_onsets: (np.array)
    :param event_offsets: (np.array)
    :param annotations_directory: (Path object)
    :return: .csv file
    """
    t_start = 0 # change and make an argument if needed
    app = mkQApp()

    # Create main window that can contain several viewers
    win = MainViewer(debug=True, show_auto_scale=True)

    # Create a viewer for signal
    # T_start essentially rereferences the start time of the dataset, but leaves the annotations alone
    # be wary of this
    view1 = TraceViewer.from_numpy(dataset, sampling_rate, t_start, 'Photodiode Signals', channel_names=labels)

    # TO DO
    # Figure out a better way to scale the different signals when presented
    # view1.params['scale_mode'] = 'same_for_all'
    view1.auto_scale()
    win.add_view(view1)

    # annotation file details
    possible_labels = ['block start', 'trial start', 'feedback', 'event']
    events_filename = annotations_directory / f'{subject}_{session}_{task}_ph_events.csv'

    source_epoch = CsvEpochSource(events_filename, possible_labels)

    # create a viewer for the encoder itself
    view2 = EpochEncoder(source=source_epoch, name='Tagging events')
    win.add_view(view2)

    # show main window and run Qapp
    win.show()
    app.exec()

data_clean_viewer(subject, session, task, annotations_directory, electrode_names, dataset, fs)

This function serves to look at lfp signals and look at which is the reference or to look at the macrowires and clean the data for epileptic activity Current: Only array functionality, so data should be hard loaded as an array for this function to work :param subject: (string) subject id :param session: (string) session id :param task: (string) task id :param annotations_directory: (Path) path object :param electrode_names: (1d array) :param dataset: (n_electrodes, n_timepoints) :param fs: (int) :return:

Source code in intracranial_ephys_utils/manual_process.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def data_clean_viewer(subject, session, task, annotations_directory, electrode_names, dataset, fs):
    """
    This function serves to look at lfp signals and look at which is the reference or to look at the
    macrowires and clean the data for epileptic activity
    Current: Only array functionality, so data should be hard loaded as an array for this function to work
    :param subject: (string) subject id
    :param session: (string) session id
    :param task: (string) task id
    :param annotations_directory: (Path) path object
    :param electrode_names: (1d array)
    :param dataset: (n_electrodes, n_timepoints)
    :param fs: (int)
    :return:
    """
    possible_labels = ['bad epileptic activity macro', 'bad eight hertz noise', 'bad epileptic activity micro',
                       'bad epileptic activity both', 'reference electrode', 'microPED electrode',
                       'white noise electrode', 'clipping noise electrode', 'epileptic macrocontact',
                       'white noise macrocontact']
    cleaning_task = 'seizure_data_cleaning'
    file_path = annotations_directory / f'{subject}_{session}_{task}_{cleaning_task}_events.csv'
    source_epoch = CsvEpochSource(file_path, possible_labels)

    t_start = 0.
    # you must first create a main Qt application (for event loop)
    app = mkQApp()

    # Create the main window that can contain several viewers
    win = MainViewer(debug=True, show_auto_scale=True)

    # create a viewer for signal
    view1 = TraceViewer.from_numpy(dataset.T, fs, t_start, 'Microwires', channel_names=electrode_names)
    # view1 = TraceViewer.from_neo_analogsignal(analog_signals, 'sigs')
    view1.params['scale_mode'] = 'same_for_all'
    view1.auto_scale()
    win.add_view(view1)

    # create a viewer for the encoder itself
    view2 = EpochEncoder(source=source_epoch, name='Tagging events')
    win.add_view(view2)

    #
    # view3 = EventList(source=source_epoch, name='events')
    # win.add_view(view3)
    # show main window and run Qapp
    win.show()

    app.exec()

write_timestamps(subject, session, task, event_folder, annotations_directory, local_data_directory, events_filename)

Looks in event folders for labels. Takse labels and converts them to machine time and places into a .txt file useful for spike sorting. :param subject: (string) Subject identifier :param session: (string) Session identifier (1 if subject only completed one run of the experiment. :param task: (string) Task identifier. The task or experiment subject completed. :param annotations_directory: This is the folder where manual annotations are found :param event_folder: This is the folder where events live (helpful to get global machine time) :param local_data_directory: This is where the microwire data to be sorted is :param events_filename: If multiple events exist :return: None. A txt file is generated with relative timestamps if needed, or not if not needed.

Source code in intracranial_ephys_utils/manual_process.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def write_timestamps(subject, session, task, event_folder, annotations_directory, local_data_directory,
                     events_filename):
    """
    Looks in event folders for labels. Takse labels and converts them to machine time and places into a .txt file useful
    for spike sorting.
    :param subject: (string) Subject identifier
    :param session: (string) Session identifier (1 if subject only completed one run of the experiment.
    :param task: (string) Task identifier. The task or experiment subject completed.
    :param annotations_directory: This is the folder where manual annotations are found
    :param event_folder: This is the folder where events live (helpful to get global machine time)
    :param local_data_directory:  This is where the microwire data to be sorted is
    :param events_filename: If multiple events exist
    :return: None. A txt file is generated with relative timestamps if needed, or not if not needed.
    """
    file_root, _ = os.path.splitext(events_filename)
    labels_file = pd.read_csv(annotations_directory / f'{subject}_{session}_{task}_{file_root}.csv')
    task_label = labels_file[labels_file.label == f"{task} duration"]
    if len(task_label) == 0:
        raise ValueError('No label found for task. Please re-run script to find task relevant data.')
    start_time_sec = task_label['time'].iloc[0].astype(float)
    end_time_sec = start_time_sec + task_label['duration'].iloc[0].astype(float)

    ext = events_filename[-6:]
    _, _, global_start, _ = get_event_times(event_folder, extension=ext)
    # microsec_sec = 10**-6
    sec_microsec = 10**6
    start_time_machine = (start_time_sec + global_start) * sec_microsec
    end_time_machine = (end_time_sec + global_start) * sec_microsec
    timestamps_file = local_data_directory / f"timestampsInclude.txt"
    specified_file = annotations_directory / f"which_files.txt"
    # generate directory if it doesn't exist
    if not os.path.exists(local_data_directory):
        os.mkdir(local_data_directory)
    with open(timestamps_file, 'w+') as f:
        f.write(f'{int(start_time_machine)}    {int(end_time_machine)}')
    with open(specified_file, 'w+') as f:
        f.write(f'{file_root}')

su_timestamp_process(subject, session, task, data_directory, annotations_directory, results_directory)

Master script that runs basic pipeline to get timestamps file for sorting minimally using OSort Matlab scripts. :param subject: (str). Subject identifier :param session: (str). Session identifier, useful if subject ran more than one session. :param task: (str). Task identifier. The task the subject ran. :param data_directory: (Path). Path object to data where events file and photodiode file lives :param annotations_directory: (Path). Path object that points to where we'd like to store annotations and metadata. :param results_directory: (Path). Path object that points to where the timestampInclude.txt file will end up. Ideally ends up in spike_sorting folder alongside 'raw' folder that contains microwire .ncs files

Source code in intracranial_ephys_utils/manual_process.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def su_timestamp_process(subject, session, task, data_directory, annotations_directory, results_directory):
    """
    Master script that runs basic pipeline to get timestamps file for sorting minimally using OSort Matlab scripts.
    :param subject: (str). Subject identifier
    :param session: (str). Session identifier, useful if subject ran more than one session.
    :param task: (str). Task identifier. The task the subject ran.
    :param data_directory: (Path). Path object to data where events file and photodiode file lives
    :param annotations_directory: (Path). Path object that points to where we'd like to store annotations and metadata.
    :param results_directory: (Path). Path object that points to where the timestampInclude.txt file will end up. Ideally
    ends up in spike_sorting folder alongside 'raw' folder that contains microwire .ncs files
    """
    # double check if there are multiple events files
    _, _, global_start, event_files = get_event_times(data_directory)

    # separate pipelines for one vs multiple events files
    if len(event_files) > 1 and type(event_files) == type([]):
        print('Multiple Events Files, we will go through the separate datasets one at a time')
        print(event_files)
        for event_file in event_files:
            print(f'Loading event file: {event_file}')
            ext = event_file[-6:]
            # print(ext)
            reformat_event_labels(subject, session, task, data_directory, annotations_directory, extension=ext)
            photodiode_check_viewer(subject, session, task, data_directory, annotations_directory, diagnostic=False,
                                    events_filename=event_file)
            file_root, _ = os.path.splitext(event_file)
            labels_file = pd.read_csv(annotations_directory / f'{subject}_{session}_{task}_{file_root}.csv')
            task_label = labels_file[labels_file.label == f"{task} duration"]
            if len(task_label) == 0:
                continue
            else:
                target_file = event_file
                print(f'You have determined that {event_file} has task relevant data')
                break
    else:
        print('One Event File, loading now')
        reformat_event_labels(subject, session, task, data_directory, annotations_directory)
        photodiode_check_viewer(subject, session, task, data_directory, annotations_directory, diagnostic=False,
                                events_filename=event_files[0])
        file_root, _ = os.path.splitext(event_files[0])
        labels_file = pd.read_csv(annotations_directory / f'{subject}_{session}_{task}_{file_root}.csv')
        task_label = labels_file[labels_file.label == f"{task} duration"]
        target_file = event_files[0]

    try:
        print('Writing machine time timestamps for the task to spike_sorting folder')
        write_timestamps(subject, session, task, data_directory, annotations_directory, results_directory,
                         events_filename=target_file)
    except NameError:
        raise NameError("No events file seems to have been made. Maybe you didn't add a task event?")

get_annotated_task_start_time(subject, session, task, annotations_directory)

This function serves as a helper function to grab the start and end times of the task, after annotating the photodiode script. Will only work if annotations file exists, and duration event has been made. Recall that loading in .ncs files relies on loading in many segments of variable length, so we typically load in more data than what start and end time would have you believe :param subject: :param session: :param task: :param annotations_directory: :return: start_time_sec (float) Start time in seconds for the task. Reference is the start of the file recording. :return: end_time_sec (float) End time in seconds for the task. Reference is the start of the file recording. :return: duration (float) Duration in seconds for the task.

Source code in intracranial_ephys_utils/manual_process.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def get_annotated_task_start_time(subject, session, task, annotations_directory):
    """
    This function serves as a helper function to grab the start and end times of the task, after annotating the
    photodiode script. Will only work if annotations file exists, and duration event has been made.
    Recall that loading in .ncs files relies on loading in many segments of variable length, so we typically load in
    more data than what start and end time would have you believe
    :param subject:
    :param session:
    :param task:
    :param annotations_directory:
    :return: start_time_sec (float) Start time in seconds for the task. Reference is the start of the file recording.
    :return: end_time_sec (float) End time in seconds for the task. Reference is the start of the file recording.
    :return: duration (float) Duration in seconds for the task.
    """
    labels_file = pd.read_csv(annotations_directory / f'{subject}_{session}_{task}_events.csv')
    task_label = labels_file[labels_file.label == f"{task} duration"]
    start_time_sec = task_label['time'].iloc[0].astype(float)
    end_time_sec = start_time_sec + task_label['duration'].iloc[0].astype(float)
    return start_time_sec, end_time_sec, task_label['duration'].iloc[0].astype(float)